hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077437 [4/5] - in /hadoop/common/branches/branch-0.20-security-patches: ./ src/test/aop/build/ src/test/org/apache/hadoop/mapred/ src/test/system/aop/org/apache/hadoop/hdfs/ src/test/system/aop/org/apache/hadoop/hdfs/server/ src/test/syst...
Date Fri, 04 Mar 2011 04:14:55 GMT
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestDistributedCacheUnModifiedFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestDistributedCacheUnModifiedFile.java?rev=1077437&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestDistributedCacheUnModifiedFile.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestDistributedCacheUnModifiedFile.java Fri Mar  4 04:14:53 2011
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.net.URI;
+import java.util.Collection;
+import java.util.ArrayList;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.TTClient;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.UtilsForTests;
+
+import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.examples.SleepJob;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+/**
+ * Verify the Distributed Cache functionality.
+ * This test scenario is for a distributed cache file behaviour
+ * when it is not modified before and after being
+ * accessed by maximum two jobs. Once a job uses a distributed cache file
+ * that file is stored in the mapred.local.dir. If the next job
+ * uses the same file, then that is not stored again.
+ * So, if two jobs choose the same tasktracker for their job execution
+ * then, the distributed cache file should not be found twice.
+ *
+ * This testcase runs a job with a distributed cache file. All the
+ * tasks' corresponding tasktracker's handle is got and checked for
+ * the presence of distributed cache with proper permissions in the
+ * proper directory. Next when job 
+ * runs again and if any of its tasks hits the same tasktracker, which
+ * ran one of the task of the previous job, then that
+ * file should not be uploaded again and task use the old file. 
+ * This is verified.
+*/
+
+public class TestDistributedCacheUnModifiedFile {
+
+  private static MRCluster cluster = null;
+  private static FileSystem dfs = null;
+  private static FileSystem ttFs = null;
+  private static JobClient client = null;
+  private static FsPermission permission = new FsPermission((short)00777);
+
+  private static String uriPath = "hdfs:///tmp/test.txt";
+  private static final Path URIPATH = new Path(uriPath);
+  private String distributedFileName = "test.txt";
+
+  static final Log LOG = LogFactory.
+                           getLog(TestDistributedCacheUnModifiedFile.class);
+
+  public TestDistributedCacheUnModifiedFile() throws Exception {
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    cluster = MRCluster.createCluster(new Configuration());
+    cluster.setUp();
+    client = cluster.getJTClient().getClient();
+    dfs = client.getFs();
+    //Deleting the file if it already exists
+    dfs.delete(URIPATH, true);
+
+    Collection<TTClient> tts = cluster.getTTClients();
+    //Stopping all TTs
+    for (TTClient tt : tts) {
+      tt.kill();
+    }
+    //Starting all TTs
+    for (TTClient tt : tts) {
+      tt.start();
+    }
+   
+    //Waiting for 5 seconds to make sure tasktrackers are ready
+    Thread.sleep(5000);
+
+    String input = "This will be the content of\n" + "distributed cache\n";
+    //Creating the path with the file
+    DataOutputStream file = 
+        UtilsForTests.createTmpFileDFS(dfs,URIPATH,permission,input);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    cluster.tearDown();
+    dfs.delete(URIPATH, true);
+    
+    Collection<TTClient> tts = cluster.getTTClients();
+    //Stopping all TTs
+    for (TTClient tt : tts) {
+      tt.kill();
+    }
+    //Starting all TTs
+    for (TTClient tt : tts) {
+      tt.start();
+    }
+  }
+
+  @Test
+  /**
+   * This tests Distributed Cache for unmodified file
+   * @param none
+   * @return void
+   */
+  public void testDistributedCache() throws Exception {
+    Configuration conf = new Configuration(cluster.getConf());
+    JTProtocol wovenClient = cluster.getJTClient().getProxy();
+
+    //This counter will check for count of a loop, 
+    //which might become infinite.
+    int count = 0;
+    //This boolean will decide whether to run job again
+    boolean continueLoop = true;
+    //counter for job Loop
+    int countLoop = 0;
+    //This counter incerases with all the tasktrackers in which tasks ran
+    int taskTrackerCounter = 0;
+    //This will store all the tasktrackers in which tasks ran
+    ArrayList<String> taskTrackerCollection = new ArrayList<String>();
+
+    do {
+      SleepJob job = new SleepJob();
+      job.setConf(conf);
+      conf = job.setupJobConf(5, 1, 1000, 1000, 100, 100);
+
+      DistributedCache.createSymlink(conf);
+      URI uri = URI.create(uriPath);
+      DistributedCache.addCacheFile(uri, conf);
+      JobConf jconf = new JobConf(conf);
+ 
+      //Controls the job till all verification is done 
+      FinishTaskControlAction.configureControlActionForJob(conf);
+
+      //Submitting the job
+      RunningJob rJob = cluster.getJTClient().getClient().submitJob(jconf);
+
+      //counter for job Loop
+      countLoop++;
+
+      TTClient tClient = null;
+      JobInfo jInfo = wovenClient.getJobInfo(rJob.getID());
+      LOG.info("jInfo is :" + jInfo);
+
+      //Assert if jobInfo is null
+      Assert.assertNotNull("jobInfo is null", jInfo);
+
+      //Wait for the job to start running.
+      count = 0;
+      while (jInfo.getStatus().getRunState() != JobStatus.RUNNING) {
+        UtilsForTests.waitFor(10000);
+        count++;
+        jInfo = wovenClient.getJobInfo(rJob.getID());
+        //If the count goes beyond a point, then break; This is to avoid
+        //infinite loop under unforeseen circumstances. Testcase will anyway
+        //fail later.
+        if (count > 10) {
+          Assert.fail("job has not reached running state for more than" + 
+            "100 seconds. Failing at this point");
+        }
+      }
+
+      LOG.info("job id is :" + rJob.getID().toString());
+
+      TaskInfo[] taskInfos = cluster.getJTClient().getProxy()
+             .getTaskInfo(rJob.getID());
+
+      boolean distCacheFileIsFound;
+       
+      for (TaskInfo taskInfo : taskInfos) {
+        distCacheFileIsFound = false;
+        String[] taskTrackers = taskInfo.getTaskTrackers();
+        for (String taskTracker : taskTrackers) {
+          //Formatting tasktracker to get just its FQDN 
+          taskTracker = UtilsForTests.getFQDNofTT(taskTracker);
+          LOG.info("taskTracker is :" + taskTracker);
+
+          //This will be entered from the second job onwards
+          if (countLoop > 1) {
+            if (taskTracker != null) {
+              continueLoop = taskTrackerCollection.contains(taskTracker);
+            }
+            if (!continueLoop) {
+              break;
+            }
+          }
+
+          //Collecting the tasktrackers
+          if (taskTracker != null)  
+            taskTrackerCollection.add(taskTracker);
+
+          //we have loopped through enough number of times to look for task
+          // getting submitted on same tasktrackers.The same tasktracker 
+          //for subsequent jobs was not hit maybe because of  many number 
+          //of tasktrackers. So, testcase has to stop here.
+          if (countLoop > 2) {
+            continueLoop = false;
+          }
+
+          tClient = cluster.getTTClient(taskTracker);
+
+          //tClient maybe null because the task is already dead. Ex: setup
+          if (tClient == null) {
+            continue;
+          }
+
+          String[] localDirs = tClient.getMapredLocalDirs();
+          int distributedFileCount = 0;
+          //Go to every single path
+          for (String localDir : localDirs) {
+            //Public Distributed cache will always be stored under
+            //mapre.local.dir/tasktracker/archive
+            localDir = localDir + Path.SEPARATOR + 
+                   TaskTracker.getPublicDistributedCacheDir();
+            LOG.info("localDir is : " + localDir);
+
+            //Get file status of all the directories 
+            //and files under that path.
+            FileStatus[] fileStatuses = tClient.listStatus(localDir, 
+                true, true);
+            for (FileStatus  fileStatus : fileStatuses) {
+              Path path = fileStatus.getPath();
+              LOG.info("path is :" + path.toString());
+              //Checking if the received path ends with 
+              //the distributed filename
+              distCacheFileIsFound = (path.toString()).
+                  endsWith(distributedFileName);
+              //If file is found, check for its permission. 
+              //Since the file is found break out of loop
+              if (distCacheFileIsFound){
+                LOG.info("PATH found is :" + path.toString());
+                distributedFileCount++;
+                String filename = path.getName();
+                FsPermission fsPerm = fileStatus.getPermission();
+                Assert.assertTrue("File Permission is not 777", 
+                    fsPerm.equals(new FsPermission("777")));
+              }
+            }
+          }
+
+          // Since distributed cache is unmodified in dfs
+          // between two job runs, it should not be present more than once
+          // in any of the task tracker, in which job ran. 
+          if (distributedFileCount > 1) {
+            Assert.fail("The distributed cache file is more than one");
+          } else if (distributedFileCount < 1)
+            Assert.fail("The distributed cache file is less than one");
+          if (!distCacheFileIsFound) {
+            Assert.assertEquals("The distributed cache file does not exist",
+                distCacheFileIsFound, false);
+          }
+        }
+      }
+      //Allow the job to continue through MR control job.
+      for (TaskInfo taskInfoRemaining : taskInfos) {
+        FinishTaskControlAction action = new FinishTaskControlAction(TaskID
+           .downgrade(taskInfoRemaining.getTaskID()));
+        Collection<TTClient> tts = cluster.getTTClients();
+        for (TTClient cli : tts) {
+          cli.getProxy().sendAction(action);
+        }
+      }
+
+      //Killing the job because all the verification needed
+      //for this testcase is completed.
+      rJob.killJob();
+    } while (continueLoop);
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestFileOwner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestFileOwner.java?rev=1077437&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestFileOwner.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestFileOwner.java Fri Mar  4 04:14:53 2011
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+import org.apache.hadoop.mapreduce.test.system.TTClient;
+import org.apache.hadoop.mapreduce.test.system.TTInfo;
+import org.apache.hadoop.mapreduce.test.system.TTTaskInfo;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestFileOwner {
+  public static MRCluster cluster;
+
+  private StringBuffer jobIdDir = new StringBuffer();
+  private JTProtocol wovenClient = null;
+  private static final Log LOG = LogFactory.getLog(TestFileOwner.class);
+  private String taskController = null;
+  private final FsPermission PERM_777 = new FsPermission("777");
+  private final FsPermission PERM_755 = new FsPermission("755");
+  private final FsPermission PERM_644 = new FsPermission("644");
+
+  @BeforeClass
+  public static void setUp() throws java.lang.Exception {
+    cluster = MRCluster.createCluster(new Configuration());
+    cluster.setUp();
+  }
+
+  /*
+   * The test is used to check the file permission of local files
+   * in mapred.local.dir. The job control is used which will make the
+   * tasks wait for completion until it is signaled
+   *
+   * @throws Exception in case of test errors
+   */
+  @Test
+  public void testFilePermission() throws Exception {
+    wovenClient = cluster.getJTClient().getProxy();
+    Configuration conf = new Configuration(cluster.getConf());
+    FinishTaskControlAction.configureControlActionForJob(conf);
+    SleepJob job = new SleepJob();
+    job.setConf(conf);
+    conf = job.setupJobConf(1, 0, 100, 100, 100, 100);
+    JobConf jconf = new JobConf(conf);
+    RunningJob rJob = cluster.getJTClient().getClient().submitJob(jconf);
+    taskController = conf.get("mapred.task.tracker.task-controller");
+    // get the job info so we can get the env variables from the daemon.
+    // Now wait for the task to be in the running state, only then the
+    // directories will be created
+    JobInfo info = wovenClient.getJobInfo(rJob.getID());
+    Assert.assertNotNull("JobInfo is null",info);
+    JobID id = rJob.getID();
+    while (info.runningMaps() != 1) {
+      Thread.sleep(1000);
+      info = wovenClient.getJobInfo(id);
+    }
+    TaskInfo[] myTaskInfos = wovenClient.getTaskInfo(id);
+    for (TaskInfo tInfo : myTaskInfos) {
+      if (!tInfo.isSetupOrCleanup()) {
+        String[] taskTrackers = tInfo.getTaskTrackers();
+        for (String taskTracker : taskTrackers) {
+          TTInfo ttInfo = wovenClient.getTTInfo(taskTracker);
+          TTClient ttCli = cluster.getTTClient(ttInfo.getStatus().getHost());
+          Assert.assertNotNull("TTClient instance is null",ttCli);
+          TTTaskInfo ttTaskInfo = ttCli.getProxy().getTask(tInfo.getTaskID());
+          Assert.assertNotNull("TTTaskInfo is null",ttTaskInfo);
+          while (ttTaskInfo.getTaskStatus().getRunState() !=
+                 TaskStatus.State.RUNNING) {
+            Thread.sleep(100);
+            ttTaskInfo = ttCli.getProxy().getTask(tInfo.getTaskID());
+          }
+          testPermissionWithTaskController(ttCli, conf, info);
+          FinishTaskControlAction action = new FinishTaskControlAction(TaskID
+              .downgrade(tInfo.getTaskID()));
+          for (TTClient cli : cluster.getTTClients()) {
+            cli.getProxy().sendAction(action);
+          }
+        }
+      }
+    }
+    JobInfo jInfo = wovenClient.getJobInfo(id);
+    jInfo = cluster.getJTClient().getProxy().getJobInfo(id);
+    while (!jInfo.getStatus().isJobComplete()) {
+      Thread.sleep(100);
+      jInfo = cluster.getJTClient().getProxy().getJobInfo(id);
+    }
+  }
+
+  private void testPermissionWithTaskController(TTClient tClient,
+      Configuration conf,
+      JobInfo info) {
+    Assert.assertNotNull("TTclient is null",tClient);
+    FsPermission fsPerm = null;
+    String[] pathInfo = conf.getStrings("mapred.local.dir");
+    for (int i = 0; i < pathInfo.length; i++) {
+      // First verify the jobid directory exists
+      jobIdDir = new StringBuffer();
+      String userName = null;
+      try {
+        JobStatus[] jobStatus = cluster.getJTClient().getClient().getAllJobs();
+        userName = jobStatus[0].getUsername();
+      } catch(Exception ex) {
+        LOG.error("Failed to get user name");
+        boolean status = false;
+        Assert.assertTrue("Failed to get the userName", status);
+      }
+      jobIdDir.append(pathInfo[i]).append(Path.SEPARATOR);
+      jobIdDir.append(TaskTracker.getLocalJobDir(userName,
+                                   info.getID().toString()));
+      FileStatus[] fs = null;
+      try {
+        fs = tClient.listStatus(jobIdDir.toString(), true);
+      } catch (Exception ex) {
+        LOG.error("Failed to get the jobIdDir files " + ex);
+      }
+      Assert.assertEquals("Filestatus length is zero",fs.length != 0, true);
+      for (FileStatus file : fs) {
+        try {
+          String filename = file.getPath().getName();
+          if (filename.equals(TaskTracker.JOBFILE)) {
+            if (taskController == DefaultTaskController.class.getName()) {
+              fsPerm = file.getPermission();
+              Assert.assertTrue("FilePermission failed for "+filename,
+                  fsPerm.equals(PERM_777));
+            }
+          }
+          if (filename.startsWith("attempt")) {
+            StringBuffer attemptDir = new StringBuffer(jobIdDir);
+            attemptDir.append(Path.SEPARATOR).append(filename);
+            if (tClient.getFileStatus(attemptDir.toString(), true) != null) {
+              FileStatus[] attemptFs = tClient.listStatus(
+                  attemptDir.toString(), true, true);
+              for (FileStatus attemptfz : attemptFs) {
+                Assert.assertNotNull("FileStatus is null",attemptfz);
+                fsPerm = attemptfz.getPermission();
+                Assert.assertNotNull("FsPermission is null",fsPerm);
+                if (taskController == DefaultTaskController.class.getName()) {
+                  if (!attemptfz.isDir()) {
+                    Assert.assertTrue("FilePermission failed for "+filename,
+                        fsPerm.equals(PERM_777));
+                  } else {
+                    Assert.assertTrue("FilePermission failed for "+filename,
+                        fsPerm.equals(PERM_755));
+                  }
+                }
+              }
+            }
+          }
+          if (filename.equals(TaskTracker.TASKJARDIR)) {
+            StringBuffer jarsDir = new StringBuffer(jobIdDir);
+            jarsDir.append(Path.SEPARATOR).append(filename);
+            FileStatus[] jarsFs = tClient.listStatus(jarsDir.toString(), true,
+                true);
+            for (FileStatus jarsfz : jarsFs) {
+              Assert.assertNotNull("FileStatus is null",jarsfz);
+              fsPerm = jarsfz.getPermission();
+              Assert.assertNotNull("File permission is null",fsPerm);
+              if (taskController == DefaultTaskController.class.getName()) {
+                if (!jarsfz.isDir()) {
+                  if (jarsfz.getPath().getName().equals("job.jar")) {
+                    Assert.assertTrue("FilePermission failed for "+filename,
+                        fsPerm.equals(PERM_777));
+                  } else {
+                    Assert.assertTrue("FilePermission failed for "+filename,
+                        fsPerm.equals(PERM_644));
+                  }
+                } else {
+                  Assert.assertTrue("FilePermission failed for "+filename,
+                      fsPerm.equals(PERM_755));
+                }
+              }
+            }
+          }
+        } catch (Exception ex) {
+          LOG.error("The exception occurred while searching for nonexsistent"
+              + "file, ignoring and continuing. " + ex);
+        }
+      }// for loop ends
+    }// for loop ends
+  }
+
+  @AfterClass
+  public static void tearDown() throws java.lang.Exception {
+    cluster.tearDown();
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobKill.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobKill.java?rev=1077437&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobKill.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobKill.java Fri Mar  4 04:14:53 2011
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import testjar.JobKillCommitter;
+
+public class TestJobKill {
+  private static final Log LOG = LogFactory.getLog(TestJobKill.class);
+  private JTProtocol wovenClient = null;
+  private static Path outDir = new Path("output");
+  private static Path inDir = new Path("input");
+  private static FileSystem fs = null;
+  private static MRCluster cluster;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    cluster = MRCluster.createCluster(new Configuration());
+    cluster.setUp();
+    fs = inDir.getFileSystem(cluster.getJTClient().getConf());
+    if(!fs.exists(inDir)){
+      fs.create(inDir);
+    }
+    if (fs.exists(outDir)) {
+      fs.delete(outDir,true);
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    if(fs.exists(inDir)) {
+      fs.delete(inDir,true);
+    }    
+    if (fs.exists(outDir)) {
+      fs.delete(outDir,true);
+    }
+    cluster.tearDown();
+  }
+
+  /*
+   * The test case intention is to test the job failure due to system
+   * exceptions, so the exceptions are thrown intentionally and the job is
+   * verified for failure. At the end of the test, the verification is made
+   * that the success file is not present in the hdfs location. This is because
+   * the success file only should exist if the actual job had succeeded. 
+   * 
+   * @throws Exception in a case of test errors
+   */
+  @Test
+  public void testSystemJobKill() throws Exception {
+    wovenClient = cluster.getJTClient().getProxy();
+    Configuration conf = new Configuration(cluster.getConf());
+    conf.set("mapred.map.max.attempts", "1");
+    conf.set("mapred.reduce.max.attempts", "1");
+    // fail the mapper job
+    failJob(conf, JobKillCommitter.CommitterWithNoError.class, "JobMapperFail",
+        JobKillCommitter.MapperFail.class, JobKillCommitter.ReducerPass.class,
+        false);
+    // fail the reducer job
+    failJob(conf, JobKillCommitter.CommitterWithNoError.class,
+        "JobReducerFail", JobKillCommitter.MapperPass.class,
+        JobKillCommitter.ReducerFail.class,false);
+    // fail the set up job
+    failJob(conf, JobKillCommitter.CommitterWithFailSetup.class,
+        "JobSetupFail", JobKillCommitter.MapperPass.class,
+        JobKillCommitter.ReducerPass.class,false);
+    // fail the clean up job
+    failJob(conf, JobKillCommitter.CommitterWithFailCleanup.class,
+        "JobCleanupFail", JobKillCommitter.MapperPass.class,
+        JobKillCommitter.ReducerPass.class,false);
+  }
+
+  private void failJob(Configuration conf,
+      Class<? extends OutputCommitter> theClass, String confName,
+      Class<? extends Mapper> mapClass, Class<? extends Reducer> redClass,
+      boolean isUserKill)
+      throws Exception {
+    Job job = new Job(conf, confName);
+    job.setJarByClass(JobKillCommitter.class);
+    job.setMapperClass(mapClass);
+    job.setCombinerClass(redClass);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Text.class);
+    job.setReducerClass(redClass);
+    job.setNumReduceTasks(1);
+    FileInputFormat.addInputPath(job, inDir);
+    FileOutputFormat.setOutputPath(job, outDir);
+    JobConf jconf = new JobConf(job.getConfiguration(), JobKillCommitter.class);
+    jconf.setOutputCommitter(theClass);
+    if(!isUserKill)
+    {  
+      RunningJob rJob = cluster.getJTClient().getClient().submitJob(jconf);
+      JobID id = rJob.getID();
+      JobInfo jInfo = wovenClient.getJobInfo(id);
+      Assert.assertTrue("Job is not in PREP state",
+          jInfo.getStatus().getRunState() == JobStatus.PREP);
+    }
+    else
+    {
+      //user kill job
+      RunningJob rJob = cluster.getJTClient().getClient().submitJob(jconf);
+      JobInfo info = wovenClient.getJobInfo(rJob.getID());
+      Assert.assertNotNull("Job Info is null",info);
+      JobID id = rJob.getID();
+      while (info.runningMaps() != 1) {
+        Thread.sleep(1000);
+        info = wovenClient.getJobInfo(id);
+      }
+      rJob.killJob();
+    }
+    checkCleanup(jconf);
+    deleteOutputDir();
+  }
+  
+  /**
+   * This test is used to kill the job by explicity calling the kill api
+   * and making sure the clean up happens
+   * @throws Exception
+   */
+  @Test
+  public void testUserJobKill() throws Exception{
+    wovenClient = cluster.getJTClient().getProxy();
+    Configuration conf = new Configuration(cluster.getConf());
+    conf.set("mapred.map.max.attempts", "1");
+    conf.set("mapred.reduce.max.attempts", "1");
+    // fail the mapper job
+    failJob(conf, JobKillCommitter.CommitterWithNoError.class, "JobUserKill",
+        JobKillCommitter.MapperPassSleep.class, 
+        JobKillCommitter.ReducerPass.class,true);    
+  }
+
+  private void checkCleanup(JobConf conf) throws Exception {
+    if (outDir != null) {
+      if (fs.exists(outDir)) {
+        Path filePath = new Path(outDir,
+            FileOutputCommitter.SUCCEEDED_FILE_NAME);
+        // check to make sure the success file is not there since the job
+        // failed.
+        Assert.assertTrue("The success file is present when the job failed",
+            !fs.exists(filePath));
+      }
+    }
+  }
+
+  private void deleteOutputDir() throws Exception {
+    if (fs != null) {
+      fs.delete(outDir, true);
+    }
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestPushConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestPushConfig.java?rev=1077437&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestPushConfig.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestPushConfig.java Fri Mar  4 04:14:53 2011
@@ -0,0 +1,145 @@
+package org.apache.hadoop.mapred;
+import java.io.File;
+import java.io.FileOutputStream;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+import org.apache.hadoop.test.system.AbstractDaemonClient;
+import org.apache.hadoop.test.system.process.HadoopDaemonRemoteCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestPushConfig {
+  private static MRCluster cluster;
+  private String localConfDir = "localconf";
+  private static final Log LOG = LogFactory.getLog(
+      TestPushConfig.class.getName());
+  
+  @BeforeClass
+  public static void before() throws Exception {
+    String [] expExcludeList = new String[2];
+    expExcludeList[0] = "java.net.ConnectException";
+    expExcludeList[1] = "java.io.IOException";
+    
+    cluster = MRCluster.createCluster(new Configuration());
+    cluster.setExcludeExpList(expExcludeList);
+    cluster.setUp();
+  }
+
+  @AfterClass
+  public static void after() throws Exception {
+    cluster.tearDown();
+  }
+  
+  /**
+   * This test about testing the pushConfig feature. The pushConfig functionality
+   * available as part of the cluster process manager. The functionality takes
+   * in local input directory and pushes all the files from the local to the 
+   * remote conf directory. This functionality is required is change the config
+   * on the fly and restart the cluster which will be used by other test cases
+   * @throws Exception 
+   */
+  @Test
+  public void testPushConfig() throws Exception {
+    final String DUMMY_CONFIG_STRING = "mapred.newdummy.conf";
+    final String DUMMY_CONFIG_STRING_VALUE = "HerriotTestRules";
+    Configuration origconf = new Configuration(cluster.getConf());
+    origconf.set(DUMMY_CONFIG_STRING, DUMMY_CONFIG_STRING_VALUE);
+    String localDir = HadoopDaemonRemoteCluster.getDeployedHadoopConfDir() + 
+        File.separator + localConfDir;
+    File lFile = new File(localDir);
+    if(!lFile.exists()){
+      lFile.mkdir();
+    }
+    String mapredConf = localDir + File.separator + "mapred-site.xml";
+    File file = new File(mapredConf);
+    origconf.writeXml(new FileOutputStream(file));    
+    Configuration daemonConf =  cluster.getJTClient().getProxy().getDaemonConf();
+    Assert.assertTrue("Dummy varialble is expected to be null before restart.",
+        daemonConf.get(DUMMY_CONFIG_STRING) == null);
+    String newDir = cluster.getClusterManager().pushConfig(localDir);
+    cluster.stop();
+    AbstractDaemonClient cli = cluster.getJTClient();
+    waitForClusterStop(cli);
+    // make sure the cluster has actually stopped
+    cluster.getClusterManager().start(newDir);
+    cli = cluster.getJTClient();
+    waitForClusterStart(cli);
+    // make sure the cluster has actually started
+    Configuration newconf = cluster.getJTClient().getProxy().getDaemonConf();
+    Assert.assertTrue("Extra varialble is expected to be set",
+        newconf.get(DUMMY_CONFIG_STRING).equals(DUMMY_CONFIG_STRING_VALUE));
+    cluster.getClusterManager().stop(newDir);
+    cli = cluster.getJTClient();
+    // make sure the cluster has actually stopped
+    waitForClusterStop(cli);
+    // start the daemons with original conf dir
+    cluster.getClusterManager().start();
+    cli = cluster.getJTClient();    
+    waitForClusterStart(cli);  
+    daemonConf =  cluster.getJTClient().getProxy().getDaemonConf();
+    Assert.assertTrue("Dummy variable is expected to be null after restart.",
+        daemonConf.get(DUMMY_CONFIG_STRING) == null);
+    lFile.delete();
+  }
+  
+  private void waitForClusterStop(AbstractDaemonClient cli) throws Exception {
+    int i=1;
+    while (i < 40) {
+      try {
+        cli.ping();
+        Thread.sleep(1000);
+        i++;
+      } catch (Exception e) {
+        break;
+      }
+    }
+    for (AbstractDaemonClient tcli : cluster.getTTClients()) {
+      i = 1;
+      while (i < 40) {
+        try {
+          tcli.ping();
+          Thread.sleep(1000);
+          i++;
+        } catch (Exception e) {
+          break;
+        }
+      }
+      if (i >= 40) {
+        Assert.fail("TT on " + tcli.getHostName() + " Should have been down.");
+      }
+    }
+  }
+  
+  private void waitForClusterStart(AbstractDaemonClient cli) throws Exception {
+    int i=1;
+    while (i < 40) {
+      try {
+        cli.ping();
+        break;
+      } catch (Exception e) {
+        i++;
+        Thread.sleep(1000);
+        LOG.info("Waiting for Jobtracker on host : "
+            + cli.getHostName() + " to come up.");
+      }
+    }
+    for (AbstractDaemonClient tcli : cluster.getTTClients()) {
+      i = 1;
+      while (i < 40) {
+        try {
+          tcli.ping();
+          break;
+        } catch (Exception e) {
+          i++;
+          Thread.sleep(1000);
+          LOG.info("Waiting for Tasktracker on host : "
+              + tcli.getHostName() + " to come up.");
+        }
+      }
+    }
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskKilling.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskKilling.java?rev=1077437&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskKilling.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskKilling.java Fri Mar  4 04:14:53 2011
@@ -0,0 +1,625 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import org.apache.hadoop.mapreduce.test.system.TTClient;
+import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction;
+import org.apache.hadoop.mapred.JobClient.NetworkedJob;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+/**
+ * A System test for verifying the status after killing the 
+ * tasks at different conditions.
+ */
+public class TestTaskKilling {
+  private static final Log LOG = LogFactory.getLog(TestTaskKilling.class);
+  private static MRCluster cluster;
+  private static JobClient jobClient = null;
+  private static JTProtocol remoteJTClient = null;
+
+  public TestTaskKilling() {
+  }
+
+  @BeforeClass
+  public static void before() throws Exception {
+    Configuration conf = new Configuration();
+    cluster = MRCluster.createCluster(conf);
+    cluster.setUp();
+    jobClient = cluster.getJTClient().getClient();
+    remoteJTClient = cluster.getJTClient().getProxy();
+  }
+
+  @AfterClass
+  public static void after() throws Exception {
+    cluster.tearDown();
+  }
+
+  /**
+   * Verifying the running job status whether it succeeds or not
+   * after failing some of its tasks.
+   */
+  @Test
+  public void testFailedTaskJobStatus() throws IOException, 
+          InterruptedException {
+    Configuration conf = new Configuration(cluster.getConf());
+    TaskInfo taskInfo = null;
+    SleepJob job = new SleepJob();
+    job.setConf(conf);
+    conf = job.setupJobConf(3, 1, 4000, 4000, 100, 100);
+    JobConf jobConf = new JobConf(conf);
+    jobConf.setMaxMapAttempts(20);
+    jobConf.setMaxReduceAttempts(20);
+    RunningJob runJob = jobClient.submitJob(jobConf);
+    JobID id = runJob.getID();
+    JobInfo jInfo = remoteJTClient.getJobInfo(id);
+    int counter = 0;
+    while (counter < 60) {
+      if (jInfo.getStatus().getRunState() == JobStatus.RUNNING) {
+        break;
+      } else {
+        UtilsForTests.waitFor(1000);
+        jInfo = remoteJTClient.getJobInfo(id);
+      }
+      counter ++;
+    }
+    Assert.assertTrue("Job has not been started for 1 min.", counter != 60);
+
+    TaskInfo[] taskInfos = remoteJTClient.getTaskInfo(id);
+    for (TaskInfo taskinfo : taskInfos) {
+      if (!taskinfo.isSetupOrCleanup()) {
+        taskInfo = taskinfo;
+      }
+    }
+
+    counter = 0;
+    taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
+    while (counter < 60) {
+      if (taskInfo.getTaskStatus().length > 0) {
+        if (taskInfo.getTaskStatus()[0].getRunState() 
+                == TaskStatus.State.RUNNING) {
+          break;
+        }
+      }
+      UtilsForTests.waitFor(1000);
+      taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
+      counter++;
+    }
+    Assert.assertTrue("Task has not been started for 1 min.", counter != 60);
+
+    NetworkedJob networkJob = jobClient.new NetworkedJob(jInfo.getStatus());
+    TaskID tID = TaskID.downgrade(taskInfo.getTaskID());
+    TaskAttemptID taskAttID = new TaskAttemptID(tID , 0);
+    networkJob.killTask(taskAttID, false);
+
+    LOG.info("Waiting till the job is completed...");
+    while (!jInfo.getStatus().isJobComplete()) {
+      UtilsForTests.waitFor(100);
+      jInfo = remoteJTClient.getJobInfo(id);
+    }
+
+    Assert.assertEquals("JobStatus", jInfo.getStatus().getRunState(), 
+            JobStatus.SUCCEEDED);
+  }
+
+
+  /**
+   * Verifying whether task temporary output directory is cleaned up or not
+   * after killing the task.
+   */
+  @Test
+  public void testDirCleanupAfterTaskKilled() throws IOException, 
+          InterruptedException {
+    TaskInfo taskInfo = null;
+    boolean isTempFolderExists = false;
+    String localTaskDir = null;
+    TTClient ttClient = null;
+    TaskID tID = null;
+    FileStatus filesStatus [] = null;
+    Path inputDir = new Path("input");
+    Path outputDir = new Path("output");
+    Configuration conf = new Configuration(cluster.getConf());
+    JobConf jconf = new JobConf(conf);
+    jconf.setJobName("Word Count");
+    jconf.setJarByClass(WordCount.class);
+    jconf.setMapperClass(WordCount.MapClass.class);
+    jconf.setCombinerClass(WordCount.Reduce.class);
+    jconf.setReducerClass(WordCount.Reduce.class);
+    jconf.setNumMapTasks(1);
+    jconf.setNumReduceTasks(1);
+    jconf.setMaxMapAttempts(20);
+    jconf.setMaxReduceAttempts(20);
+    jconf.setOutputKeyClass(Text.class);
+    jconf.setOutputValueClass(IntWritable.class);
+
+    cleanup(inputDir, conf);
+    cleanup(outputDir, conf);
+    createInput(inputDir, conf);
+    FileInputFormat.setInputPaths(jconf, inputDir);
+    FileOutputFormat.setOutputPath(jconf, outputDir);
+    RunningJob runJob = jobClient.submitJob(jconf);
+    JobID id = runJob.getID();
+    JobInfo jInfo = remoteJTClient.getJobInfo(id);
+    int counter = 0;
+    while (counter < 60) {
+      if (jInfo.getStatus().getRunState() == JobStatus.RUNNING) {
+        break;
+      } else {
+        UtilsForTests.waitFor(1000);
+        jInfo = remoteJTClient.getJobInfo(id);
+      }
+      counter ++;
+    }
+    Assert.assertTrue("Job has not been started for 1 min.", counter != 60);
+
+    JobStatus[] jobStatus = jobClient.getAllJobs();
+    String userName = jobStatus[0].getUsername();
+    TaskInfo[] taskInfos = remoteJTClient.getTaskInfo(id);
+    for (TaskInfo taskinfo : taskInfos) {
+      if (!taskinfo.isSetupOrCleanup()) {
+        taskInfo = taskinfo;
+        break;
+      }
+    }
+
+    counter = 0;
+    while (counter < 30) {
+      if (taskInfo.getTaskStatus().length > 0) {
+        if (taskInfo.getTaskStatus()[0].getRunState() 
+                == TaskStatus.State.RUNNING) {
+          break;
+        }
+      }
+      UtilsForTests.waitFor(1000);
+      taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
+      counter ++;
+    }
+    Assert.assertTrue("Task has not been started for 30 sec.", 
+            counter != 30);
+
+    tID = TaskID.downgrade(taskInfo.getTaskID());
+    FinishTaskControlAction action = new FinishTaskControlAction(tID);
+
+    String[] taskTrackers = taskInfo.getTaskTrackers();
+    counter = 0;
+    while (counter < 30) {
+      if (taskTrackers.length != 0) {
+        break;
+      }
+      UtilsForTests.waitFor(100);
+      taskTrackers = taskInfo.getTaskTrackers();
+      counter ++;
+    }
+
+    String hostName = taskTrackers[0].split("_")[1];
+    hostName = hostName.split(":")[0];
+    ttClient = cluster.getTTClient(hostName);
+    ttClient.getProxy().sendAction(action);
+    String localDirs[] = ttClient.getMapredLocalDirs();
+    TaskAttemptID taskAttID = new TaskAttemptID(tID, 0);
+    for (String localDir : localDirs) {
+      localTaskDir = localDir + "/" 
+              + TaskTracker.getLocalTaskDir(userName, 
+                      id.toString(), taskAttID.toString());
+      filesStatus = ttClient.listStatus(localTaskDir, true);
+      if (filesStatus.length > 0) {
+        isTempFolderExists = true;
+        NetworkedJob networkJob = jobClient.new NetworkedJob(jInfo.getStatus());
+        networkJob.killTask(taskAttID, false);
+        break;
+      }
+    }
+
+    Assert.assertTrue("Task Attempt directory " + 
+            taskAttID + " has not been found while task was running.", 
+                    isTempFolderExists);
+    taskInfo = remoteJTClient.getTaskInfo(tID);
+
+    counter = 0;
+    while (counter < 60) {
+      UtilsForTests.waitFor(1000);
+      taskInfo = remoteJTClient.getTaskInfo(tID);
+      filesStatus = ttClient.listStatus(localTaskDir, true);
+      if (filesStatus.length == 0) {
+        break;
+      }
+      counter ++;
+    }
+
+    Assert.assertTrue("Task attempt temporary folder has not been cleaned.", 
+            isTempFolderExists && filesStatus.length == 0);
+    counter = 0;
+    while (counter < 30) {
+      UtilsForTests.waitFor(1000);
+      taskInfo = remoteJTClient.getTaskInfo(tID);
+      counter ++;
+    }
+    taskInfo = remoteJTClient.getTaskInfo(tID);
+    Assert.assertEquals("Task status has not been changed to KILLED.", 
+            TaskStatus.State.KILLED, 
+                    taskInfo.getTaskStatus()[0].getRunState());
+  }
+
+  private void cleanup(Path dir, Configuration conf) throws 
+          IOException {
+    FileSystem fs = dir.getFileSystem(conf);
+    fs.delete(dir, true);
+  }
+
+  private void createInput(Path inDir, Configuration conf) throws 
+          IOException {
+    String input = "Hadoop is framework for data intensive distributed " 
+            + "applications.\n" 
+            + "Hadoop enables applications to work with thousands of nodes.";
+    FileSystem fs = inDir.getFileSystem(conf);
+    if (!fs.mkdirs(inDir)) {
+      throw new IOException("Failed to create the input directory:" 
+            + inDir.toString());
+    }
+    fs.setPermission(inDir, new FsPermission(FsAction.ALL, 
+            FsAction.ALL, FsAction.ALL));
+    DataOutputStream file = fs.create(new Path(inDir, "data.txt"));
+    int i = 0;
+    while(i < 1000 * 3000) {
+      file.writeBytes(input);
+      i++;
+    }
+    file.close();
+  }
+
+  /**
+   * Verifying whether task temporary output directory is cleaned up or not
+   * after failing the task.
+   */
+  @Test
+  public void testDirCleanupAfterTaskFailed() throws IOException, 
+          InterruptedException {
+    TTClient ttClient = null;
+    FileStatus filesStatus [] = null;
+    String localTaskDir = null;
+    TaskInfo taskInfo = null;
+    TaskID tID = null;
+    boolean isTempFolderExists = false;
+    Path inputDir = new Path("input");
+    Path outputDir = new Path("output");
+    Configuration conf = new Configuration(cluster.getConf());
+    JobConf jconf = new JobConf(conf);
+    jconf.setJobName("Task Failed job");
+    jconf.setJarByClass(UtilsForTests.class);
+    jconf.setMapperClass(FailedMapperClass.class);
+    jconf.setNumMapTasks(1);
+    jconf.setNumReduceTasks(0);
+    jconf.setMaxMapAttempts(1);
+    cleanup(inputDir, conf);
+    cleanup(outputDir, conf);
+    createInput(inputDir, conf);
+    FileInputFormat.setInputPaths(jconf, inputDir);
+    FileOutputFormat.setOutputPath(jconf, outputDir);
+    RunningJob runJob = jobClient.submitJob(jconf);
+    JobID id = runJob.getID();
+    JobInfo jInfo = remoteJTClient.getJobInfo(id);
+    
+    int counter = 0;
+    while (counter < 60) {
+      if (jInfo.getStatus().getRunState() == JobStatus.RUNNING) {
+        break;
+      } else {
+        UtilsForTests.waitFor(1000);
+        jInfo = remoteJTClient.getJobInfo(id);
+      }
+      counter ++;
+    }
+    Assert.assertTrue("Job has not been started for 1 min.", counter != 60);
+
+    JobStatus[] jobStatus = jobClient.getAllJobs();
+    String userName = jobStatus[0].getUsername();
+    TaskInfo[] taskInfos = remoteJTClient.getTaskInfo(id);
+    for (TaskInfo taskinfo : taskInfos) {
+      if (!taskinfo.isSetupOrCleanup()) {
+        taskInfo = taskinfo;
+        break;
+      }
+    }
+
+    tID = TaskID.downgrade(taskInfo.getTaskID());
+    FinishTaskControlAction action = new FinishTaskControlAction(tID);
+    String[] taskTrackers = taskInfo.getTaskTrackers();
+    counter = 0;
+    while (counter < 30) {
+      if (taskTrackers.length != 0) {
+        break;
+      }
+      UtilsForTests.waitFor(1000);
+      taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
+      taskTrackers = taskInfo.getTaskTrackers();
+      counter ++;
+    }
+    Assert.assertTrue("Task tracker not found.", taskTrackers.length != 0);
+    String hostName = taskTrackers[0].split("_")[1];
+    hostName = hostName.split(":")[0];
+    ttClient = cluster.getTTClient(hostName);
+    ttClient.getProxy().sendAction(action);
+
+    counter = 0;
+    while(counter < 60) {
+      if (taskInfo.getTaskStatus().length > 0) {
+        if (taskInfo.getTaskStatus()[0].getRunState() 
+                == TaskStatus.State.RUNNING) {
+          break;
+        }
+      }
+      UtilsForTests.waitFor(1000);
+      taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
+      counter ++;
+    }
+    Assert.assertTrue("Task has not been started for 1 min.", 
+            counter != 60);
+
+    String localDirs[] = ttClient.getMapredLocalDirs();
+    TaskAttemptID taskAttID = new TaskAttemptID(tID, 0);
+    for (String localDir : localDirs) {
+      localTaskDir = localDir + "/" 
+              + TaskTracker.getLocalTaskDir(userName, 
+                      id.toString(), taskAttID.toString());
+      filesStatus = ttClient.listStatus(localTaskDir, true);
+      if (filesStatus.length > 0) {
+        isTempFolderExists = true;
+        break;
+      }
+    }
+
+    taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
+    Assert.assertTrue("Task Attempt directory " + 
+            taskAttID + " has not been found while task was running.", 
+                    isTempFolderExists);
+    counter = 0;
+    while (counter < 30) {
+      UtilsForTests.waitFor(1000);
+      taskInfo = remoteJTClient.getTaskInfo(tID);
+      counter ++;
+    }
+
+    Assert.assertEquals("Task status has not been changed to FAILED.", 
+            taskInfo.getTaskStatus()[0].getRunState(), 
+                    TaskStatus.State.FAILED);
+
+    filesStatus = ttClient.listStatus(localTaskDir, true);
+    Assert.assertTrue("Temporary folder has not been cleanup.", 
+            filesStatus.length == 0);
+  }
+
+  public static class FailedMapperClass implements 
+          Mapper<NullWritable, NullWritable, NullWritable, NullWritable> {
+    public void configure(JobConf job) {
+    }
+    public void map(NullWritable key, NullWritable value, 
+            OutputCollector<NullWritable, NullWritable> output, 
+                    Reporter reporter) throws IOException {
+      int counter = 0;
+      while (counter < 240) {
+        UtilsForTests.waitFor(1000);
+        counter ++;
+      }
+      if (counter == 240) {
+        throw new IOException();
+      }
+    }
+    public void close() {
+    }
+  }
+  
+  @Test
+  /**
+   * This tests verification of job killing by killing of all task 
+   * attempts of a particular task
+   * @param none
+   * @return void
+   */
+  public void testAllTaskAttemptKill() throws Exception {
+    Configuration conf = new Configuration(cluster.getConf());
+
+    JobStatus[] jobStatus = null;
+
+    SleepJob job = new SleepJob();
+    job.setConf(conf);
+    conf = job.setupJobConf(3, 1, 40000, 1000, 100, 100);
+    JobConf jconf = new JobConf(conf);
+
+    //Submitting the job
+    RunningJob rJob = cluster.getJTClient().getClient().submitJob(jconf);
+
+    int MAX_MAP_TASK_ATTEMPTS = Integer.
+        parseInt(jconf.get("mapred.map.max.attempts"));
+
+    LOG.info("MAX_MAP_TASK_ATTEMPTS is : " + MAX_MAP_TASK_ATTEMPTS);
+
+    Assert.assertTrue(MAX_MAP_TASK_ATTEMPTS > 0);
+
+    TTClient tClient = null;
+    TTClient[] ttClients = null;
+
+    JobInfo jInfo = remoteJTClient.getJobInfo(rJob.getID());
+
+    //Assert if jobInfo is null
+    Assert.assertNotNull(jInfo.getStatus().getRunState());
+
+    //Wait for the job to start running.
+    while (jInfo.getStatus().getRunState() != JobStatus.RUNNING) {
+      try {
+        Thread.sleep(10000);
+      } catch (InterruptedException e) {};
+      jInfo = remoteJTClient.getJobInfo(rJob.getID());
+    }
+
+    //Temporarily store the jobid to use it later for comparision.
+    JobID jobidStore = rJob.getID();
+    jobidStore = JobID.downgrade(jobidStore);
+    LOG.info("job id is :" + jobidStore.toString());
+
+    TaskInfo[] taskInfos = null;
+
+    //After making sure that the job is running,
+    //the test execution has to make sure that
+    //at least one task has started running before continuing.
+    boolean runningCount = false;
+    int count = 0;
+    do {
+      taskInfos = cluster.getJTClient().getProxy()
+        .getTaskInfo(rJob.getID());
+      runningCount = false;
+      for (TaskInfo taskInfo : taskInfos) {
+        TaskStatus[] taskStatuses = taskInfo.getTaskStatus();
+        if (taskStatuses.length > 0){
+          LOG.info("taskStatuses[0].getRunState() is :" +
+            taskStatuses[0].getRunState());
+          if (taskStatuses[0].getRunState() == TaskStatus.State.RUNNING){
+            runningCount = true;
+            break;
+          } else {
+            LOG.info("Sleeping 5 seconds");
+            Thread.sleep(5000);
+          }
+        }
+      }
+      count++;
+      //If the count goes beyond a point, then break; This is to avoid
+      //infinite loop under unforeseen circumstances. Testcase will anyway
+      //fail later.
+      if (count > 10) {
+        Assert.fail("Since the sleep count has reached beyond a point" +
+          "failing at this point");
+      } 
+    } while (!runningCount);
+
+    //This whole module is about getting the task Attempt id
+    //of one task and killing it MAX_MAP_TASK_ATTEMPTS times,
+    //whenever it re-attempts to run.
+    String taskIdKilled = null;
+    for (int i = 0 ; i<MAX_MAP_TASK_ATTEMPTS; i++) {
+      taskInfos = cluster.getJTClient().getProxy()
+          .getTaskInfo(rJob.getID());
+
+      for (TaskInfo taskInfo : taskInfos) {
+        TaskAttemptID taskAttemptID;
+        if (!taskInfo.isSetupOrCleanup()) {
+          //This is the task which is going to be killed continously in
+          //all its task attempts.The first task is getting picked up.
+          TaskID taskid = TaskID.downgrade(taskInfo.getTaskID());
+          LOG.info("taskid is :" + taskid);
+          if (i==0) {
+            taskIdKilled = taskid.toString();
+            taskAttemptID = new TaskAttemptID(taskid, i);
+            LOG.info("taskAttemptid going to be killed is : " + taskAttemptID);
+            (jobClient.new NetworkedJob(jInfo.getStatus())).
+                killTask(taskAttemptID,true);
+            checkTaskCompletionEvent(taskAttemptID, jInfo);
+            break;
+          } else {
+            if (taskIdKilled.equals(taskid.toString())) {
+              taskAttemptID = new TaskAttemptID(taskid, i);
+              LOG.info("taskAttemptid going to be killed is : " +
+                  taskAttemptID);
+              (jobClient.new NetworkedJob(jInfo.getStatus())).
+                  killTask(taskAttemptID,true);
+              checkTaskCompletionEvent(taskAttemptID,jInfo);
+              break;
+            }
+          }
+        }
+      }
+    }
+    //Making sure that the job is complete.
+    while (jInfo != null && !jInfo.getStatus().isJobComplete()) {
+      Thread.sleep(10000);
+      jInfo = remoteJTClient.getJobInfo(rJob.getID());
+    }
+
+    //Making sure that the correct jobstatus is got from all the jobs
+    jobStatus = jobClient.getAllJobs();
+    JobStatus jobStatusFound = null;
+    for (JobStatus jobStatusTmp : jobStatus) {
+      if (JobID.downgrade(jobStatusTmp.getJobID()).equals(jobidStore)) {
+        jobStatusFound = jobStatusTmp;
+        LOG.info("jobStatus found is :" + jobStatusFound.getJobId().toString());
+      }
+    }
+
+    //Making sure that the job has FAILED
+    Assert.assertEquals("The job should have failed at this stage",
+        JobStatus.FAILED,jobStatusFound.getRunState());
+  }
+
+  //This method checks if task Attemptid occurs in the list
+  //of tasks that are completed (killed) for a job.This is
+  //required because after issuing a kill comamnd, the task
+  //has to be killed and appear in the taskCompletion event.
+  //After this a new task attempt will start running in a
+  //matter of few seconds.
+  public void checkTaskCompletionEvent (TaskAttemptID taskAttemptID,
+      JobInfo jInfo) throws Exception {
+    boolean match = false;
+    int count = 0;
+    while (!match) {
+      TaskCompletionEvent[] taskCompletionEvents =  jobClient.new
+        NetworkedJob(jInfo.getStatus()).getTaskCompletionEvents(0);
+      for (TaskCompletionEvent taskCompletionEvent : taskCompletionEvents) {
+        if ((taskCompletionEvent.getTaskAttemptId().toString()).
+            equals(taskAttemptID.toString())){
+          match = true;
+          //Sleeping for 10 seconds giving time for the next task
+          //attempt to run
+          Thread.sleep(10000);
+          break;
+        }
+      }
+      if (!match) {
+        LOG.info("Thread is sleeping for 10 seconds");
+        Thread.sleep(10000);
+        count++;
+      }
+      //If the count goes beyond a point, then break; This is to avoid
+      //infinite loop under unforeseen circumstances.Testcase will anyway
+      //fail later.
+      if (count > 10) {
+        Assert.fail("Since the task attemptid is not appearing in the" +
+            "TaskCompletionEvent, it seems this task attempt was not killed");
+      }
+    }
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskOwner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskOwner.java?rev=1077437&r1=1077436&r2=1077437&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskOwner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskOwner.java Fri Mar  4 04:14:53 2011
@@ -19,40 +19,22 @@
 package org.apache.hadoop.mapred;
 
 import java.io.BufferedReader;
-import java.io.IOException;
 import java.io.InputStreamReader;
-import java.util.Collection;
-import java.util.Iterator;
 import java.util.StringTokenizer;
 
-import junit.framework.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-
-import org.apache.hadoop.examples.SleepJob;
-import org.apache.hadoop.examples.WordCount.IntSumReducer;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapred.TextOutputFormat;
-
-import org.apache.hadoop.mapreduce.test.system.JTClient;
 import org.apache.hadoop.mapreduce.test.system.MRCluster;
-import org.apache.hadoop.mapreduce.test.system.TTClient;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.io.Text;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -73,7 +55,11 @@ public class TestTaskOwner {
     cluster = MRCluster.createCluster(new Configuration());
     cluster.setUp();
     FileSystem fs = inDir.getFileSystem(cluster.getJTClient().getConf());
-    fs.create(inDir);
+    // Make sure that all is clean in case last tearDown wasn't successful
+    fs.delete(outDir, true);
+    fs.delete(inDir, true);
+
+    fs.create(inDir, true);
   }
 
   @Test
@@ -104,7 +90,6 @@ public class TestTaskOwner {
     // as the
     // user name that was used to launch the task in the first place
     FileSystem fs = outDir.getFileSystem(conf);
-    StringBuffer result = new StringBuffer();
 
     Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
      new Utils.OutputFileUtils.OutputFilesFilter()));
@@ -128,20 +113,18 @@ public class TestTaskOwner {
               .toString());
            break;
          }
-
         }
         file.close();
      }
-
   }
 
   @AfterClass
   public static void tearDown() throws java.lang.Exception {
     FileSystem fs = outDir.getFileSystem(cluster.getJTClient().getConf());
     fs.delete(outDir, true);
+    fs.delete(inDir, true);
     cluster.tearDown();
    }
-
 }
 
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTProtocol.java?rev=1077437&r1=1077436&r2=1077437&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTProtocol.java Fri Mar  4 04:14:53 2011
@@ -32,30 +32,42 @@ public interface JTProtocol extends Daem
 
   /**
    * Get the information pertaining to given job.<br/>
+   * The returned JobInfo object can be null when the
+   * specified job by the job id is retired from the 
+   * JobTracker memory which happens after job is 
+   * completed. <br/>
    * 
    * @param id
    *          of the job for which information is required.
-   * @return information of regarding job.
+   * @return information of regarding job null if job is 
+   *         retired from JobTracker memory.
    * @throws IOException
    */
   public JobInfo getJobInfo(JobID jobID) throws IOException;
 
   /**
    * Gets the information pertaining to a task. <br/>
-   * 
+   * The returned TaskInfo object can be null when the 
+   * specified task specified by the task id is retired
+   * from the JobTracker memory which happens after the
+   * job is completed. <br/>
    * @param id
    *          of the task for which information is required.
-   * @return information of regarding the task.
+   * @return information of regarding the task null if the 
+   *          task is retired from JobTracker memory.
    * @throws IOException
    */
   public TaskInfo getTaskInfo(TaskID taskID) throws IOException;
 
   /**
    * Gets the information pertaining to a given TaskTracker. <br/>
-   * 
+   * The returned TTInfo class can be null if the given TaskTracker
+   * information is removed from JobTracker memory which is done
+   * when the TaskTracker is marked lost by the JobTracker. <br/>
    * @param name
    *          of the tracker.
-   * @return information regarding the tracker.
+   * @return information regarding the tracker null if the TaskTracker
+   *          is marked lost by the JobTracker.
    * @throws IOException
    */
   public TTInfo getTTInfo(String trackerName) throws IOException;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java?rev=1077437&r1=1077436&r2=1077437&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java Fri Mar  4 04:14:53 2011
@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapreduce.test.system;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
@@ -30,7 +31,9 @@ import org.apache.hadoop.test.system.Abs
 import org.apache.hadoop.test.system.AbstractDaemonCluster;
 import org.apache.hadoop.test.system.process.ClusterProcessManager;
 import org.apache.hadoop.test.system.process.HadoopDaemonRemoteCluster;
+import org.apache.hadoop.test.system.process.MultiUserHadoopDaemonRemoteCluster;
 import org.apache.hadoop.test.system.process.RemoteProcess;
+import org.apache.hadoop.test.system.process.HadoopDaemonRemoteCluster.HadoopDaemonInfo;
 
 /**
  * Concrete AbstractDaemonCluster representing a Map-Reduce cluster.
@@ -44,21 +47,23 @@ public class MRCluster extends AbstractD
     "test.system.mr.clusterprocess.impl.class";
 
   /**
-   * Key is used to to point to the file containing hostname of the jobtracker
-   */
-  public static final String CONF_HADOOP_JT_HOSTFILE_NAME =
-    "test.system.hdrc.jt.hostfile";
-  /**
    * Key is used to to point to the file containing hostnames of tasktrackers
    */
   public static final String CONF_HADOOP_TT_HOSTFILE_NAME =
     "test.system.hdrc.tt.hostfile";
 
-  private static String JT_hostFileName;
+  private static List<HadoopDaemonInfo> mrDaemonInfos = 
+    new ArrayList<HadoopDaemonInfo>();
   private static String TT_hostFileName;
+  private static String jtHostName;
 
   protected enum Role {JT, TT};
-  
+
+  static{
+    Configuration.addDefaultResource("mapred-default.xml");
+    Configuration.addDefaultResource("mapred-site.xml");
+  }
+
   private MRCluster(Configuration conf, ClusterProcessManager rCluster)
       throws IOException {
     super(conf, rCluster);
@@ -74,14 +79,20 @@ public class MRCluster extends AbstractD
    */
   public static MRCluster createCluster(Configuration conf) 
       throws Exception {
-    JT_hostFileName = conf.get(CONF_HADOOP_JT_HOSTFILE_NAME,
-      System.getProperty(CONF_HADOOP_JT_HOSTFILE_NAME,
-        "clusterControl.masters.jt"));
-    TT_hostFileName = conf.get(CONF_HADOOP_TT_HOSTFILE_NAME,
-      System.getProperty(CONF_HADOOP_TT_HOSTFILE_NAME, "slaves"));
-
-    String implKlass = conf.get(CLUSTER_PROCESS_MGR_IMPL, System
-        .getProperty(CLUSTER_PROCESS_MGR_IMPL));
+    conf.addResource("system-test.xml");
+    TT_hostFileName = conf.get(CONF_HADOOP_TT_HOSTFILE_NAME, "slaves");
+    String jtHostPort = conf.get("mapred.job.tracker");
+    if (jtHostPort == null) {
+      throw new Exception("mapred.job.tracker is not set.");
+    }
+    jtHostName = jtHostPort.trim().split(":")[0];
+    
+    mrDaemonInfos.add(new HadoopDaemonInfo("jobtracker", 
+        Role.JT, Arrays.asList(new String[]{jtHostName})));
+    mrDaemonInfos.add(new HadoopDaemonInfo("tasktracker", 
+        Role.TT, TT_hostFileName));
+    
+    String implKlass = conf.get(CLUSTER_PROCESS_MGR_IMPL);
     if (implKlass == null || implKlass.isEmpty()) {
       implKlass = MRProcessManager.class.getName();
     }
@@ -145,12 +156,15 @@ public class MRCluster extends AbstractD
   }
 
   public static class MRProcessManager extends HadoopDaemonRemoteCluster{
-    private static final List<HadoopDaemonInfo> mrDaemonInfos = 
-      Arrays.asList(new HadoopDaemonInfo[]{
-          new HadoopDaemonInfo("jobtracker", Role.JT, JT_hostFileName),
-          new HadoopDaemonInfo("tasktracker", Role.TT, TT_hostFileName)});
     public MRProcessManager() {
       super(mrDaemonInfos);
     }
   }
+
+  public static class MultiMRProcessManager
+      extends MultiUserHadoopDaemonRemoteCluster {
+    public MultiMRProcessManager() {
+      super(mrDaemonInfos);
+    }
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTProtocol.java?rev=1077437&r1=1077436&r2=1077437&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTProtocol.java Fri Mar  4 04:14:53 2011
@@ -18,17 +18,26 @@
 
 package org.apache.hadoop.mapreduce.test.system;
 
-import java.io.IOException;
-
 import org.apache.hadoop.mapred.JobTracker;
 import org.apache.hadoop.mapred.TaskTracker;
 import org.apache.hadoop.mapred.TaskTrackerStatus;
 import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSelector;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.token.TokenInfo;
 import org.apache.hadoop.test.system.DaemonProtocol;
 
+import java.io.IOException;
+
 /**
  * TaskTracker RPC interface to be used for cluster tests.
+ *
+ * The protocol has to be annotated so KerberosInfo can be filled in during
+ * creation of a ipc.Client connection
  */
+@KerberosInfo(
+    serverPrincipal = TaskTracker.TT_USER_NAME)
+@TokenInfo(JobTokenSelector.class)
 public interface TTProtocol extends DaemonProtocol {
 
   public static final long versionID = 1L;
@@ -36,8 +45,8 @@ public interface TTProtocol extends Daem
    * Gets latest status which was sent in heartbeat to the {@link JobTracker}. 
    * <br/>
    * 
-   * @return status
-   * @throws IOException
+   * @return status of the TaskTracker daemon
+   * @throws IOException in case of errors
    */
   TaskTrackerStatus getStatus() throws IOException;
 
@@ -45,17 +54,28 @@ public interface TTProtocol extends Daem
    * Gets list of all the tasks in the {@link TaskTracker}.<br/>
    * 
    * @return list of all the tasks
-   * @throws IOException
+   * @throws IOException in case of errors
    */
   TTTaskInfo[] getTasks() throws IOException;
 
   /**
    * Gets the task associated with the id.<br/>
    * 
-   * @param id of the task.
+   * @param taskID of the task.
    * 
-   * @return
-   * @throws IOException
+   * @return returns task info <code>TTTaskInfo</code>
+   * @throws IOException in case of errors
    */
   TTTaskInfo getTask(TaskID taskID) throws IOException;
+
+  /**
+   * Checks if any of process in the process tree of the task is alive
+   * or not. <br/>
+   * 
+   * @param pid
+   *          of the task attempt
+   * @return true if task process tree is alive.
+   * @throws IOException in case of errors
+   */
+  boolean isProcessTreeAlive(String pid) throws IOException;
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTTaskInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTTaskInfo.java?rev=1077437&r1=1077436&r2=1077437&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTTaskInfo.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTTaskInfo.java Fri Mar  4 04:14:53 2011
@@ -68,4 +68,11 @@ public interface TTTaskInfo extends Writ
    * @return true if it is a clean up of task.
    */
   boolean isTaskCleanupTask();
-}
\ No newline at end of file
+
+  /**
+   * Gets the pid of the running task on the task-tracker.
+   * 
+   * @return pid of the task.
+   */
+  String getPid();
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonClient.java?rev=1077437&r1=1077436&r2=1077437&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonClient.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonClient.java Fri Mar  4 04:14:53 2011
@@ -22,9 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.ConcurrentModificationException;
 import java.util.List;
-
 import junit.framework.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -224,67 +222,73 @@ public abstract class AbstractDaemonClie
    * Gets number of times FATAL log messages where logged in Daemon logs. 
    * <br/>
    * Pattern used for searching is FATAL. <br/>
-   * 
+   * @param excludeExpList list of exception to exclude 
    * @return number of occurrence of fatal message.
    * @throws IOException
    */
-  public int getNumberOfFatalStatementsInLog() throws IOException {
+  public int getNumberOfFatalStatementsInLog(String [] excludeExpList)
+      throws IOException {
     DaemonProtocol proxy = getProxy();
     String pattern = "FATAL";
-    return proxy.getNumberOfMatchesInLogFile(pattern);
+    return proxy.getNumberOfMatchesInLogFile(pattern, excludeExpList);
   }
 
   /**
    * Gets number of times ERROR log messages where logged in Daemon logs. 
    * <br/>
    * Pattern used for searching is ERROR. <br/>
-   * 
+   * @param excludeExpList list of exception to exclude 
    * @return number of occurrence of error message.
    * @throws IOException
    */
-  public int getNumberOfErrorStatementsInLog() throws IOException {
+  public int getNumberOfErrorStatementsInLog(String[] excludeExpList) 
+      throws IOException {
     DaemonProtocol proxy = getProxy();
-    String pattern = "ERROR";
-    return proxy.getNumberOfMatchesInLogFile(pattern);
+    String pattern = "ERROR";    
+    return proxy.getNumberOfMatchesInLogFile(pattern, excludeExpList);
   }
 
   /**
    * Gets number of times Warning log messages where logged in Daemon logs. 
    * <br/>
    * Pattern used for searching is WARN. <br/>
-   * 
+   * @param excludeExpList list of exception to exclude 
    * @return number of occurrence of warning message.
    * @throws IOException
    */
-  public int getNumberOfWarnStatementsInLog() throws IOException {
+  public int getNumberOfWarnStatementsInLog(String[] excludeExpList) 
+      throws IOException {
     DaemonProtocol proxy = getProxy();
     String pattern = "WARN";
-    return proxy.getNumberOfMatchesInLogFile(pattern);
+    return proxy.getNumberOfMatchesInLogFile(pattern, excludeExpList);
   }
 
   /**
    * Gets number of time given Exception were present in log file. <br/>
    * 
    * @param e exception class.
+   * @param excludeExpList list of exceptions to exclude. 
    * @return number of exceptions in log
    * @throws IOException
    */
-  public int getNumberOfExceptionsInLog(Exception e)
-      throws IOException {
+  public int getNumberOfExceptionsInLog(Exception e,
+      String[] excludeExpList) throws IOException {
     DaemonProtocol proxy = getProxy();
-    String pattern = e.getClass().getSimpleName();
-    return proxy.getNumberOfMatchesInLogFile(pattern);
+    String pattern = e.getClass().getSimpleName();    
+    return proxy.getNumberOfMatchesInLogFile(pattern, excludeExpList);
   }
 
   /**
    * Number of times ConcurrentModificationException present in log file. 
    * <br/>
+   * @param excludeExpList list of exceptions to exclude.
    * @return number of times exception in log file.
    * @throws IOException
    */
-  public int getNumberOfConcurrentModificationExceptionsInLog()
-      throws IOException {
-    return getNumberOfExceptionsInLog(new ConcurrentModificationException());
+  public int getNumberOfConcurrentModificationExceptionsInLog(
+      String[] excludeExpList) throws IOException {
+    return getNumberOfExceptionsInLog(new ConcurrentModificationException(),
+        excludeExpList);
   }
 
   private int errorCount;
@@ -294,16 +298,17 @@ public abstract class AbstractDaemonClie
   /**
    * Populate the initial exception counts to be used to assert once a testcase
    * is done there was no exception in the daemon when testcase was run.
-   * 
+   * @param excludeExpList list of exceptions to exclude
    * @throws IOException
    */
-  protected void populateExceptionCount() throws IOException {
-    errorCount = getNumberOfErrorStatementsInLog();
+  protected void populateExceptionCount(String [] excludeExpList) 
+      throws IOException {
+    errorCount = getNumberOfErrorStatementsInLog(excludeExpList);
     LOG.info("Number of error messages in logs : " + errorCount);
-    fatalCount = getNumberOfFatalStatementsInLog();
+    fatalCount = getNumberOfFatalStatementsInLog(excludeExpList);
     LOG.info("Number of fatal statement in logs : " + fatalCount);
     concurrentExceptionCount =
-        getNumberOfConcurrentModificationExceptionsInLog();
+        getNumberOfConcurrentModificationExceptionsInLog(excludeExpList);
     LOG.info("Number of concurrent modification in logs : "
         + concurrentExceptionCount);
   }
@@ -314,16 +319,18 @@ public abstract class AbstractDaemonClie
    * <b><i>
    * Pre-req for the method is that populateExceptionCount() has 
    * to be called before calling this method.</b></i>
+   * @param excludeExpList list of exceptions to exclude
    * @throws IOException
    */
-  protected void assertNoExceptionsOccurred() throws IOException {
-    int newerrorCount = getNumberOfErrorStatementsInLog();
-    LOG.info("Number of error messages while asserting : " + newerrorCount);
-    int newfatalCount = getNumberOfFatalStatementsInLog();
+  protected void assertNoExceptionsOccurred(String [] excludeExpList) 
+      throws IOException {
+    int newerrorCount = getNumberOfErrorStatementsInLog(excludeExpList);
+    LOG.info("Number of error messages while asserting :" + newerrorCount);
+    int newfatalCount = getNumberOfFatalStatementsInLog(excludeExpList);
     LOG.info("Number of fatal messages while asserting : " + newfatalCount);
     int newconcurrentExceptionCount =
-        getNumberOfConcurrentModificationExceptionsInLog();
-    LOG.info("Number of concurrentmodification execption while asserting :"
+        getNumberOfConcurrentModificationExceptionsInLog(excludeExpList);
+    LOG.info("Number of concurrentmodification exception while asserting :"
         + newconcurrentExceptionCount);
     Assert.assertEquals(
         "New Error Messages logged in the log file", errorCount, newerrorCount);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonCluster.java?rev=1077437&r1=1077436&r2=1077437&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonCluster.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonCluster.java Fri Mar  4 04:14:53 2011
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -37,7 +36,7 @@ import org.apache.hadoop.test.system.pro
 public abstract class AbstractDaemonCluster {
 
   private static final Log LOG = LogFactory.getLog(AbstractDaemonCluster.class);
-
+  private String [] excludeExpList ;
   private Configuration conf;
   protected ClusterProcessManager clusterManager;
   private Map<Enum<?>, List<AbstractDaemonClient>> daemons = 
@@ -60,6 +59,17 @@ public abstract class AbstractDaemonClus
     createAllClients();
   }
 
+  /**
+   * The method returns the cluster manager. The system test cases require an
+   * instance of HadoopDaemonRemoteCluster to invoke certain operation on the
+   * daemon.
+   * 
+   * @return instance of clusterManager
+   */
+  public ClusterProcessManager getClusterManager() {
+    return clusterManager;
+  }
+
   protected void createAllClients() throws IOException {
     for (RemoteProcess p : clusterManager.getAllProcesses()) {
       List<AbstractDaemonClient> dms = daemons.get(p.getRole());
@@ -131,14 +141,18 @@ public abstract class AbstractDaemonClus
   }
 
   protected void waitForDaemon(AbstractDaemonClient d) {
+    final int TEN_SEC = 10000;
     while(true) {
       try {
-        LOG.info("Waiting for daemon in host to come up : " + d.getHostName());
+        LOG.info("Waiting for daemon at " + d.getHostName() + " to come up.");
+        LOG.info("Daemon might not be " +
+            "ready or the call to setReady() method hasn't been " +
+            "injected to " + d.getClass() + " ");
         d.connect();
         break;
       } catch (IOException e) {
         try {
-          Thread.sleep(10000);
+          Thread.sleep(TEN_SEC);
         } catch (InterruptedException ie) {
         }
       }
@@ -212,7 +226,17 @@ public abstract class AbstractDaemonClus
     ensureClean();
     populateExceptionCounts();
   }
-
+  
+  /**
+   * This is mainly used for the test cases to set the list of exceptions
+   * that will be excluded.
+   * @param excludeExpList list of exceptions to exclude
+   */
+  public void setExcludeExpList(String [] excludeExpList)
+  {
+    this.excludeExpList = excludeExpList;
+  }
+  
   public void clearAllControlActions() throws IOException {
     for (List<AbstractDaemonClient> set : daemons.values()) {
       for (AbstractDaemonClient daemon : set) {
@@ -248,7 +272,7 @@ public abstract class AbstractDaemonClus
   protected void populateExceptionCounts() throws IOException {
     for(List<AbstractDaemonClient> lst : daemons.values()) {
       for(AbstractDaemonClient d : lst) {
-        d.populateExceptionCount();
+        d.populateExceptionCount(excludeExpList);
       }
     }
   }
@@ -261,7 +285,7 @@ public abstract class AbstractDaemonClus
   protected void assertNoExceptionMessages() throws IOException {
     for(List<AbstractDaemonClient> lst : daemons.values()) {
       for(AbstractDaemonClient d : lst) {
-        d.assertNoExceptionsOccurred();
+        d.assertNoExceptionsOccurred(excludeExpList);
       }
     }
   }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/DaemonProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/DaemonProtocol.java?rev=1077437&r1=1077436&r2=1077437&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/DaemonProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/DaemonProtocol.java Fri Mar  4 04:14:53 2011
@@ -35,7 +35,7 @@ public interface DaemonProtocol extends 
   /**
    * Returns the Daemon configuration.
    * @return Configuration
-   * @throws IOException
+   * @throws IOException in case of errors
    */
   Configuration getDaemonConf() throws IOException;
 
@@ -51,7 +51,7 @@ public interface DaemonProtocol extends 
    * Check if the Daemon is ready to accept RPC connections.
    * 
    * @return true if Daemon is ready to accept RPC connection.
-   * @throws IOException
+   * @throws IOException in case of errors
    */
   boolean isReady() throws IOException;
 
@@ -60,7 +60,7 @@ public interface DaemonProtocol extends 
    * 
    * @return returns system level view of the Daemon process.
    * 
-   * @throws IOException
+   * @throws IOException in case of errors
    */
   ProcessInfo getProcessInfo() throws IOException;
   
@@ -85,16 +85,16 @@ public interface DaemonProtocol extends 
    * @param local
    *          whether the path is local or not
    * @return the statuses of the files/directories in the given patch
-   * @throws IOException
+   * @throws IOException in case of errors
    */
   FileStatus[] listStatus(String path, boolean local) throws IOException;
   
   /**
    * Enables a particular control action to be performed on the Daemon <br/>
    * 
-   * @param control action to be enabled.
+   * @param action is a control action  to be enabled.
    * 
-   * @throws IOException
+   * @throws IOException in case of errors
    */
   @SuppressWarnings("unchecked")
   void sendAction(ControlAction action) throws IOException;
@@ -107,7 +107,7 @@ public interface DaemonProtocol extends 
    * 
    * @return true if action is still in waiting queue of 
    *          actions to be delivered.
-   * @throws IOException
+   * @throws IOException in case of errors
    */
   @SuppressWarnings("unchecked")
   boolean isActionPending(ControlAction action) throws IOException;
@@ -117,7 +117,7 @@ public interface DaemonProtocol extends 
    * daemon maintains. <br/>
    * <i><b>Not to be directly called by Test Case or clients.</b></i>
    * @param action to be removed
-   * @throws IOException
+   * @throws IOException in case of errors
    */
   
   @SuppressWarnings("unchecked")
@@ -126,7 +126,7 @@ public interface DaemonProtocol extends 
   /**
    * Clears out the list of control actions on the particular daemon.
    * <br/>
-   * @throws IOException
+   * @throws IOException in case of errors
    */
   void clearActions() throws IOException;
   
@@ -136,7 +136,7 @@ public interface DaemonProtocol extends 
    * <i><b>Not to be directly used by clients</b></i>
    * @param key target
    * @return list of actions.
-   * @throws IOException
+   * @throws IOException in case of errors
    */
   @SuppressWarnings("unchecked")
   ControlAction[] getActions(Writable key) throws IOException;
@@ -145,11 +145,21 @@ public interface DaemonProtocol extends 
    * Gets the number of times a particular pattern has been found in the 
    * daemons log file.<br/>
    * <b><i>Please note that search spans across all previous messages of
-   * Daemon, so better practise is to get previous counts before an operation
-   * and then recheck if the sequence of action has caused any problems</i></b>
-   * @param pattern
+   * Daemon, so better practice is to get previous counts before an operation
+   * and then re-check if the sequence of action has caused any problems</i></b>
+   * @param pattern to look for in the damon's log file
+   * @param List of exceptions to ignore
    * @return number of times the pattern if found in log file.
-   * @throws IOException
+   * @throws IOException in case of errors
+   */
+  int getNumberOfMatchesInLogFile(String pattern,String[] list) 
+      throws IOException;
+
+  /**
+   * Gets the user who started the particular daemon initially. <br/>
+   * 
+   * @return user who started the particular daemon.
+   * @throws IOException in case of errors
    */
-  int getNumberOfMatchesInLogFile(String pattern) throws IOException;
+  String getDaemonUser() throws IOException;
 }



Mime
View raw message