hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077176 [2/2] - in /hadoop/common/branches/branch-0.20-security-patches: ./ src/ src/test/aop/build/ src/test/system/ src/test/system/aop/ src/test/system/aop/org/ src/test/system/aop/org/apache/ src/test/system/aop/org/apache/hadoop/ src/...
Date Fri, 04 Mar 2011 03:48:50 GMT
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTClient.java?rev=1077176&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTClient.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTClient.java Fri Mar  4 03:48:49 2011
@@ -0,0 +1,299 @@
+package org.apache.hadoop.mapreduce.test.system;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.test.system.process.RemoteProcess;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import static org.junit.Assert.*;
+
+/**
+ * JobTracker client for system tests.
+ */
+public class JTClient extends MRDaemonClient<JTProtocol> {
+  static final Log LOG = LogFactory.getLog(JTClient.class);
+  private JobClient client;
+
+  /**
+   * Create JobTracker client to talk to {@link JobTracker} specified in the
+   * configuration. <br/>
+   * 
+   * @param conf
+   *          configuration used to create a client.
+   * @param daemon
+   *          the process management instance for the {@link JobTracker}
+   * @throws IOException
+   */
+  public JTClient(Configuration conf, RemoteProcess daemon) throws 
+    IOException {
+    super(conf, daemon);
+  }
+
+  @Override
+  public synchronized void connect() throws IOException {
+    if (isConnected()) {
+      return;
+    }
+    client = new JobClient(new JobConf(getConf()));
+    setConnected(true);
+  }
+
+  @Override
+  public synchronized void disconnect() throws IOException {
+    client.close();
+  }
+
+  @Override
+  public synchronized JTProtocol getProxy() {
+    return (JTProtocol) client.getProtocol();
+  }
+
+  /**
+   * Gets the {@link JobClient} which can be used for job submission. JobClient
+   * which is returned would not contain the decorated API's. To be used for
+   * submitting of the job.
+   * 
+   * @return client handle to the JobTracker
+   */
+  public JobClient getClient() {
+    return client;
+  }
+
+  /**
+   * Gets the configuration which the JobTracker is currently running.<br/>
+   * 
+   * @return configuration of JobTracker.
+   * 
+   * @throws IOException
+   */
+  public Configuration getJobTrackerConfig() throws IOException {
+    return getProxy().getDaemonConf();
+  }
+
+  /**
+   * Verification API to check running jobs and running job states.
+   * users have to ensure that their jobs remain running state while
+   * verification is called. <br/>
+   * 
+   * @param id
+   *          of the job to be verified.
+   * 
+   * @throws Exception
+   */
+  public void verifyRunningJob(JobID jobId) throws Exception {
+  }
+
+  private boolean checkJobValidityForProceeding(JobID jobId, JobInfo jobInfo)
+  throws IOException {
+    if (jobInfo != null) {
+      return true;
+    } else if (jobInfo == null && !getProxy().isJobRetired(jobId)) {
+      Assert.fail("Job id : " + jobId + " has never been submitted to JT");
+    }
+    return false;
+  }
+  
+  /**
+   * Verification API to wait till job retires and verify all the retired state
+   * is correct. 
+   * <br/>
+   * @param conf of the job used for completion
+   * @return job handle
+   * @throws Exception
+   */
+  public RunningJob submitAndVerifyJob(Configuration conf) throws Exception {
+    JobConf jconf = new JobConf(conf);
+    RunningJob rJob = getClient().submitJob(jconf);
+    JobID jobId = rJob.getID();
+    verifyRunningJob(jobId);
+    verifyCompletedJob(jobId);
+    return rJob;
+  }
+  
+  /**
+   * Verification API to check if the job completion state is correct. <br/>
+   * 
+   * @param id id of the job to be verified.
+   */
+  
+  public void verifyCompletedJob(JobID id) throws Exception{
+    RunningJob rJob = getClient().getJob(
+        org.apache.hadoop.mapred.JobID.downgrade(id));
+    while(!rJob.isComplete()) {
+      LOG.info("waiting for job :" + id + " to retire");
+      Thread.sleep(1000);
+      rJob = getClient().getJob(
+          org.apache.hadoop.mapred.JobID.downgrade(id));
+    }
+    verifyJobDetails(id);
+    JobInfo jobInfo = getProxy().getJobInfo(id);
+    if(jobInfo == null && 
+        !getProxy().isJobRetired(id)) {
+      Assert.fail("The passed job id : " + id + 
+          " is not submitted to JT.");
+    }
+    while(!jobInfo.isHistoryFileCopied()) {
+      Thread.sleep(1000);
+      LOG.info(id+" waiting for history file to copied");
+      jobInfo = getProxy().getJobInfo(id);
+    }
+    verifyJobHistory(id);
+  }
+
+  /**
+   * Verification API to check if the job details are semantically correct.<br/>
+   * 
+   *  @param jobId
+   *          jobID of the job
+   * @param jconf
+   *          configuration object of the job
+   * @return true if all the job verifications are verified to be true
+   * @throws Exception
+   */
+  public void verifyJobDetails(JobID jobId) throws Exception {
+    // wait till the setup is launched and finished.
+    JobInfo jobInfo = getProxy().getJobInfo(jobId);
+    if(!checkJobValidityForProceeding(jobId, jobInfo)){
+      return;
+    }
+    LOG.info("waiting for the setup to be finished");
+    while (!jobInfo.isSetupFinished()) {
+      Thread.sleep(2000);
+      jobInfo = getProxy().getJobInfo(jobId);
+    }
+    // verify job id.
+    assertTrue(jobId.toString().startsWith("job_"));
+    LOG.info("verified job id and is : " + jobId.toString());
+    // verify the number of map/reduce tasks.
+    verifyNumTasks(jobId);
+    // should verify job progress.
+    verifyJobProgress(jobId);
+    jobInfo = getProxy().getJobInfo(jobId);
+    if (jobInfo.getStatus().getRunState() == JobStatus.SUCCEEDED) {
+      // verify if map/reduce progress reached 1.
+      jobInfo = getProxy().getJobInfo(jobId);
+      checkJobValidityForProceeding(jobId, jobInfo);
+      assertEquals(1.0, jobInfo.getStatus().mapProgress(), 0.001);
+      assertEquals(1.0, jobInfo.getStatus().reduceProgress(), 0.001);
+      // verify successful finish of tasks.
+      verifyAllTasksSuccess(jobId);
+    }
+    if (jobInfo.getStatus().isJobComplete()) {
+      // verify if the cleanup is launched.
+      jobInfo = getProxy().getJobInfo(jobId);
+      checkJobValidityForProceeding(jobId, jobInfo);
+      assertTrue(jobInfo.isCleanupLaunched());
+      LOG.info("Verified launching of cleanup");
+    }
+  }
+
+  
+  public void verifyAllTasksSuccess(JobID jobId) throws IOException {
+    JobInfo jobInfo = getProxy().getJobInfo(jobId);
+    
+    if(!checkJobValidityForProceeding(jobId, jobInfo)){ 
+      return;
+    }
+    
+    TaskInfo[] taskInfos = getProxy().getTaskInfo(jobId);
+    
+    if(taskInfos.length == 0 && getProxy().isJobRetired(jobId)) {
+      LOG.info("Job has been retired from JT memory : " + jobId);
+      return;
+    }
+    
+    for (TaskInfo taskInfo : taskInfos) {
+      TaskStatus[] taskStatus = taskInfo.getTaskStatus();
+      if (taskStatus != null && taskStatus.length > 0) {
+        int i;
+        for (i = 0; i < taskStatus.length; i++) {
+          if (TaskStatus.State.SUCCEEDED.equals(taskStatus[i].getRunState())) {
+            break;
+          }
+        }
+        assertFalse(i == taskStatus.length);
+      }
+    }
+    LOG.info("verified that none of the tasks failed.");
+  }
+  
+  public void verifyJobProgress(JobID jobId) throws IOException {
+    JobInfo jobInfo;
+    jobInfo = getProxy().getJobInfo(jobId);
+    if(!checkJobValidityForProceeding(jobId, jobInfo)){
+      return;
+    }
+    assertTrue(jobInfo.getStatus().mapProgress() >= 0 && jobInfo.getStatus()
+        .mapProgress() <= 1);
+    LOG.info("verified map progress and is "
+        + jobInfo.getStatus().mapProgress());    
+    assertTrue(jobInfo.getStatus().reduceProgress() >= 0 && jobInfo.getStatus()
+        .reduceProgress() <= 1);
+    LOG.info("verified reduce progress and is "
+        + jobInfo.getStatus().reduceProgress());
+  }
+  
+  public void verifyNumTasks(JobID jobId) throws IOException {
+    JobInfo jobInfo;
+    jobInfo = getProxy().getJobInfo(jobId);
+    if(!checkJobValidityForProceeding(jobId, jobInfo)) {
+      return;
+    }
+    assertEquals(jobInfo.numMaps(), (jobInfo.runningMaps()
+        + jobInfo.waitingMaps() + jobInfo.finishedMaps()));
+    LOG.info("verified number of map tasks and is " + jobInfo.numMaps());
+    
+    assertEquals(jobInfo.numReduces(),  (jobInfo.runningReduces()
+        + jobInfo.waitingReduces() + jobInfo.finishedReduces()));
+    LOG.info("verified number of reduce tasks and is "
+        + jobInfo.numReduces());
+  }
+
+  /**
+   * Verification API to check if the job history file is semantically correct.
+   * <br/>
+   * 
+   * 
+   * @param id
+   *          of the job to be verified.
+   * @throws IOException
+   */
+  public void verifyJobHistory(JobID jobId) throws IOException {
+    JobInfo info = getProxy().getJobInfo(jobId);
+    String url ="";
+    info = getProxy().getJobInfo(jobId);
+    if(info == null && !getProxy().isJobRetired(jobId)) {
+      Assert.fail("Job id : " + jobId + 
+          " has never been submitted to JT");
+    } else if(info == null) {
+      LOG.info("Job has been retired from JT memory : " + jobId);
+      url = getProxy().getJobHistoryLocationForRetiredJob(jobId);
+    } else {
+      url = info.getHistoryUrl();
+    }
+    Path p = new Path(url);
+    if (p.toUri().getScheme().equals("file:/")) {
+      FileStatus st = getFileStatus(url, true);
+      Assert.assertNotNull("Job History file for " + jobId + " not present " +
+          "when job is completed" , st);
+    } else {
+      FileStatus st = getFileStatus(url, false);
+      Assert.assertNotNull("Job History file for " + jobId + " not present " +
+          "when job is completed" , st);
+    }
+    LOG.info("Verified the job history for the jobId : " + jobId);
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTProtocol.java?rev=1077176&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTProtocol.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTProtocol.java Fri Mar  4 03:48:49 2011
@@ -0,0 +1,91 @@
+package org.apache.hadoop.mapreduce.test.system;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.test.system.DaemonProtocol;
+
+/**
+ * Client side API's exposed from JobTracker.
+ */
+public interface JTProtocol extends DaemonProtocol {
+  long versionID = 1L;
+
+  /**
+   * Get the information pertaining to given job.<br/>
+   * 
+   * @param id
+   *          of the job for which information is required.
+   * @return information of regarding job.
+   * @throws IOException
+   */
+  public JobInfo getJobInfo(JobID jobID) throws IOException;
+
+  /**
+   * Gets the information pertaining to a task. <br/>
+   * 
+   * @param id
+   *          of the task for which information is required.
+   * @return information of regarding the task.
+   * @throws IOException
+   */
+  public TaskInfo getTaskInfo(TaskID taskID) throws IOException;
+
+  /**
+   * Gets the information pertaining to a given TaskTracker. <br/>
+   * 
+   * @param name
+   *          of the tracker.
+   * @return information regarding the tracker.
+   * @throws IOException
+   */
+  public TTInfo getTTInfo(String trackerName) throws IOException;
+
+  /**
+   * Gets a list of all available jobs with JobTracker.<br/>
+   * 
+   * @return list of all jobs.
+   * @throws IOException
+   */
+  public JobInfo[] getAllJobInfo() throws IOException;
+
+  /**
+   * Gets a list of tasks pertaining to a job. <br/>
+   * 
+   * @param id
+   *          of the job.
+   * 
+   * @return list of all tasks for the job.
+   * @throws IOException
+   */
+  public TaskInfo[] getTaskInfo(JobID jobID) throws IOException;
+
+  /**
+   * Gets a list of TaskTrackers which have reported to the JobTracker. <br/>
+   * 
+   * @return list of all TaskTracker.
+   * @throws IOException
+   */
+  public TTInfo[] getAllTTInfo() throws IOException;
+
+  /**
+   * Checks if a given job is retired from the JobTrackers Memory. <br/>
+   * 
+   * @param id
+   *          of the job
+   * @return true if job is retired.
+   * @throws IOException
+   */
+  boolean isJobRetired(JobID jobID) throws IOException;
+
+  /**
+   * Gets the location of the history file for a retired job. <br/>
+   * 
+   * @param id
+   *          of the job
+   * @return location of history file
+   * @throws IOException
+   */
+  String getJobHistoryLocationForRetiredJob(JobID jobID) throws IOException;
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JobInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JobInfo.java?rev=1077176&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JobInfo.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JobInfo.java Fri Mar  4 03:48:49 2011
@@ -0,0 +1,121 @@
+package org.apache.hadoop.mapreduce.test.system;
+
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapreduce.JobID;
+
+/**
+ * Job state information as seen by the JobTracker.
+ */
+public interface JobInfo extends Writable {
+  /**
+   * Gets the JobId of the job.<br/>
+   * 
+   * @return id of the job.
+   */
+  JobID getID();
+
+  /**
+   * Gets the current status of the job.<br/>
+   * 
+   * @return status.
+   */
+  JobStatus getStatus();
+
+  /**
+   * Gets the history location of the job.<br/>
+   * 
+   * @return the path to the history file.
+   */
+  String getHistoryUrl();
+
+  /**
+   * Gets the number of maps which are currently running for the job. <br/>
+   * 
+   * @return number of running for the job.
+   */
+  int runningMaps();
+
+  /**
+   * Gets the number of reduces currently running for the job. <br/>
+   * 
+   * @return number of reduces running for the job.
+   */
+  int runningReduces();
+
+  /**
+   * Gets the number of maps to be scheduled for the job. <br/>
+   * 
+   * @return number of waiting maps.
+   */
+  int waitingMaps();
+
+  /**
+   * Gets the number of reduces to be scheduled for the job. <br/>
+   * 
+   * @return number of waiting reduces.
+   */
+  int waitingReduces();
+  
+  /**
+   * Gets the number of maps that are finished. <br/>
+   * @return the number of finished maps.
+   */
+  int finishedMaps();
+  
+  /**
+   * Gets the number of map tasks that are to be spawned for the job <br/>
+   * @return
+   */
+  int numMaps();
+  
+  /**
+   * Gets the number of reduce tasks that are to be spawned for the job <br/>
+   * @return
+   */
+  int numReduces();
+  
+  /**
+   * Gets the number of reduces that are finished. <br/>
+   * @return the number of finished reduces.
+   */
+  int finishedReduces();
+
+  /**
+   * Gets if cleanup for the job has been launched.<br/>
+   * 
+   * @return true if cleanup task has been launched.
+   */
+  boolean isCleanupLaunched();
+
+  /**
+   * Gets if the setup for the job has been launched.<br/>
+   * 
+   * @return true if setup task has been launched.
+   */
+  boolean isSetupLaunched();
+
+  /**
+   * Gets if the setup for the job has been completed.<br/>
+   * 
+   * @return true if the setup task for the job has completed.
+   */
+  boolean isSetupFinished();
+
+  /**
+   * Gets list of blacklisted trackers for the particular job. <br/>
+   * 
+   * @return list of blacklisted tracker name.
+   */
+  List<String> getBlackListedTrackers();
+  
+  /**
+   * Gets if the history file of the job is copied to the done 
+   * location <br/>
+   * 
+   * @return true if history file copied.
+   */
+  boolean isHistoryFileCopied();
+}
\ No newline at end of file

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java?rev=1077176&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java Fri Mar  4 03:48:49 2011
@@ -0,0 +1,72 @@
+package org.apache.hadoop.mapreduce.test.system;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.test.system.AbstractMasterSlaveCluster;
+import org.apache.hadoop.test.system.process.ClusterProcessManager;
+import org.apache.hadoop.test.system.process.ClusterProcessManagerFactory;
+import org.apache.hadoop.test.system.process.RemoteProcess;
+import org.apache.hadoop.test.system.process.ClusterProcessManager.ClusterType;
+
+/**
+ * Concrete MasterSlaveCluster representing a Map-Reduce cluster.
+ * 
+ */
+public class MRCluster extends AbstractMasterSlaveCluster<JTClient, 
+      TTClient> {
+
+  private static final Log LOG = LogFactory.getLog(MRCluster.class);
+
+  private MRCluster(Configuration conf, ClusterProcessManager rCluster)
+      throws IOException {
+    super(conf, rCluster);
+  }
+
+  /**
+   * Creates an instance of the Map-Reduce cluster.<br/>
+   * Example usage: <br/>
+   * <code>
+   * Configuration conf = new Configuration();<br/>
+   * conf.set(ClusterProcessManager.IMPL_CLASS,
+   * org.apache.hadoop.test.system.process.HadoopDaemonRemoteCluster.
+   * class.getName())<br/>
+   * conf.set(HadoopDaemonRemoteCluster.CONF_HADOOPHOME,
+   * "/path");<br/>
+   * conf.set(HadoopDaemonRemoteCluster.CONF_HADOOPCONFDIR,
+   * "/path");<br/>
+   * MRCluster cluster = MRCluster.createCluster(conf);
+   * </code>
+   * 
+   * @param conf
+   *          contains all required parameter to create cluster.
+   * @return a cluster instance to be managed.
+   * @throws IOException
+   * @throws Exception
+   */
+  public static MRCluster createCluster(Configuration conf) 
+      throws IOException, Exception {
+    return new MRCluster(conf, ClusterProcessManagerFactory.createInstance(
+        ClusterType.MAPRED, conf));
+  }
+
+  @Override
+  protected JTClient createMaster(RemoteProcess masterDaemon)
+      throws IOException {
+    return new JTClient(getConf(), masterDaemon);
+  }
+
+  @Override
+  protected TTClient createSlave(RemoteProcess slaveDaemon) 
+      throws IOException {
+    return new TTClient(getConf(), slaveDaemon);
+  }
+
+  @Override
+  public void ensureClean() throws IOException {
+    //TODO: ensure that no jobs/tasks are running
+    //restart the cluster if cleanup fails
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRDaemonClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRDaemonClient.java?rev=1077176&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRDaemonClient.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRDaemonClient.java Fri Mar  4 03:48:49 2011
@@ -0,0 +1,28 @@
+package org.apache.hadoop.mapreduce.test.system;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.test.system.AbstractDaemonClient;
+import org.apache.hadoop.test.system.DaemonProtocol;
+import org.apache.hadoop.test.system.process.RemoteProcess;
+
+/**
+ * Base class for JobTracker and TaskTracker clients.
+ */
+public abstract class MRDaemonClient<PROXY extends DaemonProtocol> 
+    extends AbstractDaemonClient<PROXY>{
+
+  public MRDaemonClient(Configuration conf, RemoteProcess process)
+      throws IOException {
+    super(conf, process);
+  }
+
+  public String[] getMapredLocalDirs() throws IOException {
+    return getProxy().getDaemonConf().getStrings("mapred.local.dir");
+  }
+
+  public String getLogDir() throws IOException {
+    return getProcessInfo().getSystemProperties().get("hadoop.log.dir");
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRFault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRFault.java?rev=1077176&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRFault.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRFault.java Fri Mar  4 03:48:49 2011
@@ -0,0 +1,12 @@
+package org.apache.hadoop.mapreduce.test.system;
+
+/**
+ * Fault injection types. At a given time any of these faults (0 or more) 
+ * can be injected. 
+ * @see AbstractMasterSlaveCluster#enable(List<Enum>)
+ * @see AbstractMasterSlaveCluster#disable(List<Enum>)
+ */
+public enum MRFault {
+  BAD_NODE_HEALTH,
+  STALL_HEARTBEAT
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTClient.java?rev=1077176&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTClient.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTClient.java Fri Mar  4 03:48:49 2011
@@ -0,0 +1,72 @@
+package org.apache.hadoop.mapreduce.test.system;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.TaskTrackerStatus;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.test.system.process.RemoteProcess;
+
+/**
+ * TaskTracker client for system tests. Assumption of the class is that the
+ * configuration key is set for the configuration key : {@code
+ * mapred.task.tracker.report.address}is set, only the port portion of the
+ * address is used.
+ */
+public class TTClient extends MRDaemonClient<TTProtocol> {
+
+  TTProtocol proxy;
+
+  public TTClient(Configuration conf, RemoteProcess daemon) 
+      throws IOException {
+    super(conf, daemon);
+  }
+
+  @Override
+  public synchronized void connect() throws IOException {
+    if (isConnected()) {
+      return;
+    }
+    String sockAddrStr = getConf()
+        .get("mapred.task.tracker.report.address");
+    if (sockAddrStr == null) {
+      throw new IllegalArgumentException(
+          "TaskTracker report address is not set");
+    }
+    String[] splits = sockAddrStr.split(":");
+    if (splits.length != 2) {
+      throw new IllegalArgumentException(
+          "TaskTracker report address not correctly configured");
+    }
+    String port = splits[1];
+    String sockAddr = getHostName() + ":" + port;
+    InetSocketAddress bindAddr = NetUtils.createSocketAddr(sockAddr);
+    proxy = (TTProtocol) RPC.getProxy(TTProtocol.class, TTProtocol.versionID,
+        bindAddr, getConf());
+    setConnected(true);
+  }
+
+  @Override
+  public synchronized void disconnect() throws IOException {
+    RPC.stopProxy(proxy);
+  }
+
+  @Override
+  public synchronized TTProtocol getProxy() {
+    return proxy;
+  }
+
+  /**
+   * Gets the last sent status to the {@link JobTracker}. <br/>
+   * 
+   * @return the task tracker status.
+   * @throws IOException
+   */
+  public TaskTrackerStatus getStatus() throws IOException {
+    return getProxy().getStatus();
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTInfo.java?rev=1077176&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTInfo.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTInfo.java Fri Mar  4 03:48:49 2011
@@ -0,0 +1,24 @@
+package org.apache.hadoop.mapreduce.test.system;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.TaskTracker;
+import org.apache.hadoop.mapred.TaskTrackerStatus;
+
+/**
+ * TaskTracker state information as seen by the JobTracker.
+ */
+public interface TTInfo extends Writable {
+  /**
+   * Gets the {@link TaskTracker} name.<br/>
+   * 
+   * @return name of the tracker.
+   */
+  String getName();
+
+  /**
+   * Gets the current status of the {@link TaskTracker} <br/>
+   * 
+   * @return status of the {@link TaskTracker}
+   */
+  TaskTrackerStatus getStatus();
+}
\ No newline at end of file

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTProtocol.java?rev=1077176&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTProtocol.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTProtocol.java Fri Mar  4 03:48:49 2011
@@ -0,0 +1,32 @@
+package org.apache.hadoop.mapreduce.test.system;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.TaskTracker;
+import org.apache.hadoop.mapred.TaskTrackerStatus;
+import org.apache.hadoop.test.system.DaemonProtocol;
+
+/**
+ * TaskTracker RPC interface to be used for cluster tests.
+ */
+public interface TTProtocol extends DaemonProtocol {
+
+  public static final long versionID = 1L;
+  /**
+   * Gets latest status which was sent in heartbeat to the {@link JobTracker}. 
+   * <br/>
+   * 
+   * @return status
+   * @throws IOException
+   */
+  TaskTrackerStatus getStatus() throws IOException;
+
+  /**
+   * Gets list of all the tasks in the {@link TaskTracker}.<br/>
+   * 
+   * @return list of all the tasks
+   * @throws IOException
+   */
+  TTTaskInfo[] getTasks() throws IOException;
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTTaskInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTTaskInfo.java?rev=1077176&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTTaskInfo.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTTaskInfo.java Fri Mar  4 03:48:49 2011
@@ -0,0 +1,40 @@
+package org.apache.hadoop.mapreduce.test.system;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapred.TaskTracker;
+
+/**
+ * Task state information as seen by the TT.
+ */
+public interface TTTaskInfo extends Writable {
+  /**
+   * Gets the task associated to the instance as seen by {@link TaskTracker}
+   * <br/>
+   * 
+   * @return task.
+   */
+  Task getTask();
+
+  /**
+   * Gets the diagnostic information associated the the task.<br/>
+   * 
+   * @return diagnostic information of the task.
+   */
+  String getDiagnosticInfo();
+
+  /**
+   * Has task occupied a slot? A task occupies a slot once it starts localizing
+   * on the {@link TaskTracker} <br/>
+   * 
+   * @return true if task has started occupying a slot.
+   */
+  boolean slotTaken();
+
+  /**
+   * Has the task been killed? <br/>
+   * 
+   * @return true, if task has been killed.
+   */
+  boolean wasKilled();
+}
\ No newline at end of file

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TaskInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TaskInfo.java?rev=1077176&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TaskInfo.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TaskInfo.java Fri Mar  4 03:48:49 2011
@@ -0,0 +1,57 @@
+package org.apache.hadoop.mapreduce.test.system;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.TaskID;
+
+/**
+ * Task state information of a TaskInProgress as seen by the {@link JobTracker}
+ */
+public interface TaskInfo extends Writable {
+  /**
+   * Gets the task id of the TaskInProgress.
+   * 
+   * @return id of the task.
+   */
+  TaskID getTaskID();
+
+  /**
+   * Number of times task attempts have failed for the given TaskInProgress.
+   * <br/>
+   * 
+   * @return number of failed task attempts.
+   */
+  int numFailedAttempts();
+
+  /**
+   * Number of times task attempts have been killed for the given TaskInProgress 
+   * <br/>
+   * 
+   * @return number of killed task attempts.
+   */
+  int numKilledAttempts();
+
+  /**
+   * Gets the progress of the Task in percentage will be in range of 0.0-1.0 
+   * <br/>
+   * 
+   * @return progress of task in percentage.
+   */
+  double getProgress();
+
+  /**
+   * Number of attempts currently running for the given TaskInProgress.<br/>
+   * 
+   * @return number of running attempts.
+   */
+  int numRunningAttempts();
+
+  /**
+   * Array of TaskStatus objects that are related to the corresponding
+   * TaskInProgress object.
+   * 
+   * @return
+   */
+  TaskStatus[] getTaskStatus();
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonClient.java?rev=1077176&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonClient.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonClient.java Fri Mar  4 03:48:49 2011
@@ -0,0 +1,121 @@
+package org.apache.hadoop.test.system;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.test.system.process.RemoteProcess;
+/**
+ * Abstract class which encapsulates the DaemonClient which is used in the 
+ * system tests.<br/>
+ * 
+ * @param PROXY the proxy implementation of a specific Daemon 
+ */
+public abstract class AbstractDaemonClient<PROXY extends DaemonProtocol> {
+  private Configuration conf;
+  private RemoteProcess process;
+  private boolean connected;
+
+  /**
+   * Create a Daemon client.<br/>
+   * 
+   * @param conf client to be used by proxy to connect to Daemon.
+   * @param process the Daemon process to manage the particular daemon.
+   * 
+   * @throws IOException
+   */
+  public AbstractDaemonClient(Configuration conf, RemoteProcess process) 
+      throws IOException {
+    this.conf = conf;
+    this.process = process;
+  }
+
+  public boolean isConnected() {
+    return connected;
+  }
+
+  protected void setConnected(boolean connected) {
+    this.connected = connected;
+  }
+
+  public abstract void connect() throws IOException;
+
+  public abstract void disconnect() throws IOException;
+
+  /**
+   * Get the proxy to connect to a particular service Daemon.<br/>
+   * 
+   * @return proxy to connect to a particular service Daemon.
+   */
+  protected abstract PROXY getProxy();
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public String getHostName() {
+    return process.getHostName();
+  }
+
+  public boolean isReady() throws IOException {
+    return getProxy().isReady();
+  }
+
+  public void kill() throws IOException {
+    process.kill();
+  }
+
+  public void ping() throws IOException {
+    getProxy().ping();
+  }
+
+  public void start() throws IOException {
+    process.start();
+  }
+
+  public ProcessInfo getProcessInfo() throws IOException {
+    return getProxy().getProcessInfo();
+  }
+
+  public void enable(List<Enum<?>> faults) throws IOException {
+    getProxy().enable(faults);
+  }
+
+  public void disableAll() throws IOException {
+    getProxy().disableAll();
+  }
+
+  public FileStatus getFileStatus(String path, boolean local) throws IOException {
+    return getProxy().getFileStatus(path, local);
+  }
+
+  public FileStatus[] listStatus(String path, boolean local) 
+    throws IOException {
+    return getProxy().listStatus(path, local);
+  }
+
+  public FileStatus[] listStatus(String f, boolean local, boolean recursive) 
+    throws IOException {
+    List<FileStatus> status = new ArrayList<FileStatus>();
+    addStatus(status, f, local, recursive);
+    return status.toArray(new FileStatus[0]);
+  }
+
+  private void addStatus(List<FileStatus> status, String f, 
+      boolean local, boolean recursive) 
+    throws IOException {
+    FileStatus[] fs = listStatus(f, local);
+    if (fs != null) {
+      for (FileStatus fileStatus : fs) {
+        if (!f.equals(fileStatus.getPath().toString())) {
+          status.add(fileStatus);
+          if (recursive) {
+            addStatus(status, fileStatus.getPath().toString(), local, recursive);
+          }
+        }
+      }
+    }
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractMasterSlaveCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractMasterSlaveCluster.java?rev=1077176&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractMasterSlaveCluster.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractMasterSlaveCluster.java Fri Mar  4 03:48:49 2011
@@ -0,0 +1,323 @@
+package org.apache.hadoop.test.system;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.test.system.process.ClusterProcessManager;
+import org.apache.hadoop.test.system.process.RemoteProcess;
+
+/**
+ * Abstract class which Represents a cluster, which contains a single master and
+ * one or more slave.<br/>
+ * 
+ * @param Master
+ *          daemon client type.
+ * @param Slave
+ *          daemon client type.
+ */
+public abstract class AbstractMasterSlaveCluster
+    <MASTER extends AbstractDaemonClient, SLAVE extends AbstractDaemonClient> {
+
+  public static final String WAITFORMASTERKEY = 
+    "test.system.abstractmasterslavecluster.waitformaster";
+  
+  private static final Log LOG = 
+    LogFactory.getLog(AbstractMasterSlaveCluster.class);
+
+  private Configuration conf;
+  protected ClusterProcessManager clusterManager;
+  private MASTER master;
+  private Map<String, SLAVE> slaves = new HashMap<String, SLAVE>();
+  private boolean waitformaster = false;
+
+  /**
+   * Constructor to create a master slave cluster.<br/>
+   * 
+   * @param conf
+   *          Configuration to be used while constructing the cluster.
+   * @param rcluster
+   *          process manger instance to be used for managing the daemons.
+   * 
+   * @throws IOException
+   */
+  public AbstractMasterSlaveCluster(Configuration conf,
+      ClusterProcessManager rcluster) throws IOException {
+    this.conf = conf;
+    this.clusterManager = rcluster;
+    this.master = createMaster(clusterManager.getMaster());
+    Iterator<Map.Entry<String, RemoteProcess>> it = clusterManager.getSlaves()
+        .entrySet().iterator();
+    while (it.hasNext()) {
+      Map.Entry<String, RemoteProcess> entry = it.next();
+      slaves.put(entry.getKey(), createSlave(entry.getValue()));
+    }
+    this.waitformaster = conf.getBoolean(WAITFORMASTERKEY, true);
+  }
+
+  /**
+   * Method to create the master daemon client.<br/>
+   * 
+   * @param remoteprocess
+   *          to manage the master daemon.
+   * @return instance of the daemon client of master daemon.
+   * 
+   * @throws IOException
+   */
+  protected abstract MASTER createMaster(RemoteProcess masterDaemon)
+      throws IOException;
+
+  /**
+   * Method to create the slave daemons clients.<br/>
+   * 
+   * @param remoteprocess
+   *          to manage the slave daemons.
+   * @return instance of the daemon clients of slave daemons.
+   * 
+   * @throws IOException
+   */
+  protected abstract SLAVE createSlave(RemoteProcess slaveDaemon)
+      throws IOException;
+
+  /**
+   * Get the global cluster configuration which was used to create the 
+   * cluster. <br/>
+   * 
+   * @return global configuration of the cluster.
+   */
+  public Configuration getConf() {
+    return conf;
+  }
+
+  /**
+   * Return the client handle of the master Daemon.<br/>
+   * 
+   * @return master daemon client handle.
+   */
+  public MASTER getMaster() {
+    return master;
+  }
+
+  /**
+   * Return the client handle of the slave Daemons.<br/>
+   * 
+   * @return map of host to slave daemon clients.
+   */
+  public Map<String, SLAVE> getSlaves() {
+    return slaves;
+  }
+
+  /**
+   * Checks if the master slave cluster is ready for testing. <br/>
+   * Algorithm for checking is as follows : <br/>
+   * <ul>
+   * <li> Wait for Daemon to come up </li>
+   * <li> Check if daemon is ready </li>
+   * <li> If one of the daemon is not ready, return false </li>
+   * </ul> 
+   * 
+   * @return true if whole cluster is ready.
+   * 
+   * @throws IOException
+   */
+  public boolean isReady() throws IOException {
+    LOG.info("Check if master is up and running");
+    waitForDaemon(master);
+    if (!master.isReady()) {
+      return false;
+    }
+    LOG.info("Check if slaves are up and running");
+    for (SLAVE slave : slaves.values()) {
+      waitForDaemon(slave);
+      if (!slave.isReady()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private void waitForDaemon(AbstractDaemonClient d) {
+    while(true) {
+      try {
+        LOG.info("Waiting for daemon in host to come up : " + d.getHostName());
+        d.connect();
+        break;
+      } catch (IOException e) {
+        try {
+          Thread.sleep(10000);
+        } catch (InterruptedException ie) {
+        }
+      }
+    }
+  }
+
+  /**
+   * Start the master slave cluster. <br/>
+   * The startup behavior is controlled by the {@code WAITFORMASTERKEY}.
+   * <ul>
+   * <li>If{@code WAITFORMASTERKEY} is set to true then start up of slaves are
+   * done after master daemon comes up and is ready to accept the RPC connection
+   * </li>
+   * <li>Else the daemons are started up sequentially without waiting for master
+   * daemon to be ready.</li>
+   * </ul>
+   * 
+   * @throws IOException
+   */
+  public void start() throws IOException {
+    if (waitformaster) {
+      this.master.start();
+      waitForMaster();
+      startSlaves();
+    } else {
+      clusterManager.start();
+    }
+  }
+
+  private void waitForMaster() throws IOException {
+    waitForDaemon(master);
+    while (!master.isReady()) {
+      try {
+        LOG.info("Waiting for master daemon to be ready to accept " +
+        		"RPC connection");
+        Thread.sleep(10000);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  private void startSlaves() throws IOException {
+    Map<String, RemoteProcess> slaves = clusterManager.getSlaves();
+    for (RemoteProcess s : slaves.values()) {
+      s.start();
+    }
+  }
+
+  /**
+   * Stops the master slave cluster.<br/>
+   * 
+   * @throws IOException
+   */
+  public void stop() throws IOException {
+    clusterManager.stop();
+  }
+
+  /**
+   * Connect to master and slave RPC ports.
+   * @throws IOException
+   */
+  public void connect() throws IOException {
+    LOG.info("Connecting to the cluster..." + getClass().getName());
+    master.connect();
+    for (SLAVE slave : slaves.values()) {
+      slave.connect();
+    }
+  }
+
+  /**
+   * Disconnect to master and slave RPC ports.
+   * @throws IOException
+   */
+  public void disconnect() throws IOException {
+    LOG.info("Disconnecting to the cluster..." + 
+        getClass().getName());
+    master.disconnect();
+    for (SLAVE slave : slaves.values()) {
+      slave.disconnect();
+    }
+    LOG.info("Disconnected!!");
+  }
+
+  /**
+   * Enable/Inject the faults. In case fault can't be enabled on ALL nodes
+   * cluster is restarted.
+   */
+  public void enable(List<Enum<?>> faults) throws IOException {
+    try {
+      enableFaults(faults);
+    } catch (IOException e) {
+      stop();
+      start();
+      enableFaults(faults);
+    }
+  }
+
+  /**
+   * Disable/Remove the all the faults. In case fault can't be disabled on ALL
+   * nodes cluster is restarted.
+   */
+  public void disableAllFaults() throws IOException {
+    try {
+      disableFaults();
+    } catch (IOException e) {
+      stop();
+      start();
+      disableFaults();
+    }
+  }
+
+  private void enableFaults(List<Enum<?>> faults) throws IOException {
+    master.enable(faults);
+    for (SLAVE slave : slaves.values()) {
+      slave.enable(faults);
+    }
+  }
+
+  private void disableFaults() throws IOException {
+    master.disableAll();
+    for (SLAVE slave : slaves.values()) {
+      slave.disableAll();
+    }
+  }
+
+  /**
+   * Ping all the daemons of the cluster.
+   * @throws IOException
+   */
+  public void ping() throws IOException {
+    MASTER master = getMaster();
+    LOG.info("Master is :" + master.getHostName() + " pinging ...");
+    master.ping();
+    Collection<SLAVE> slaves = getSlaves().values();
+    for (SLAVE slave : slaves) {
+      LOG.info("Slave is : " + slave.getHostName() + " pinging....");
+      slave.ping();
+    }
+  }
+
+  /**
+   * Connect to the cluster and ensure that it is clean to run tests.
+   * @throws Exception
+   */
+  public void setUp() throws Exception {
+    while (!isReady()) {
+      Thread.sleep(1000);
+    }
+    connect();
+    ping();
+    ensureClean();
+  }
+
+  /**
+   * Ensure that the cluster is clean to run tests.
+   * @throws IOException
+   */
+  public void ensureClean() throws IOException {
+  }
+
+  /**
+   * Ensure that cluster is clean. Disconnect from the RPC ports of the daemons.
+   * @throws IOException
+   */
+  public void tearDown() throws IOException {
+    ensureClean();
+    disconnect();
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/DaemonProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/DaemonProtocol.java?rev=1077176&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/DaemonProtocol.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/DaemonProtocol.java Fri Mar  4 03:48:49 2011
@@ -0,0 +1,90 @@
+package org.apache.hadoop.test.system;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+/**
+ * RPC interface of a given Daemon.
+ */
+public interface DaemonProtocol extends VersionedProtocol{
+  long versionID = 1L;
+
+  /**
+   * Returns the Daemon configuration.
+   * @return Configuration
+   * @throws IOException
+   */
+  Configuration getDaemonConf() throws IOException;
+
+  /**
+   * Check if the Daemon is alive.
+   * 
+   * @throws IOException
+   *           if Daemon is unreachable.
+   */
+  void ping() throws IOException;
+
+  /**
+   * Check if the Daemon is ready to accept RPC connections.
+   * 
+   * @return true if Daemon is ready to accept RPC connection.
+   * @throws IOException
+   */
+  boolean isReady() throws IOException;
+
+  /**
+   * Get system level view of the Daemon process.
+   * 
+   * @return returns system level view of the Daemon process.
+   * 
+   * @throws IOException
+   */
+  ProcessInfo getProcessInfo() throws IOException;
+
+  /**
+   * Enable the set of specified faults in the Daemon.<br/>
+   * 
+   * @param faults
+   *          list of faults to be enabled.
+   * 
+   * @throws IOException
+   */
+  void enable(List<Enum<?>> faults) throws IOException;
+
+  /**
+   * Disable all the faults which are enabled in the Daemon. <br/>
+   * 
+   * @throws IOException
+   */
+  void disableAll() throws IOException;
+
+  /**
+   * Return a file status object that represents the path.
+   * @param path
+   *          given path
+   * @param local
+   *          whether the path is local or not
+   * @return a FileStatus object
+   * @throws FileNotFoundException when the path does not exist;
+   *         IOException see specific implementation
+   */
+  FileStatus getFileStatus(String path, boolean local) throws IOException;
+
+  /**
+   * List the statuses of the files/directories in the given path if the path is
+   * a directory.
+   * 
+   * @param path
+   *          given path
+   * @param local
+   *          whether the path is local or not
+   * @return the statuses of the files/directories in the given patch
+   * @throws IOException
+   */
+  FileStatus[] listStatus(String path, boolean local) throws IOException;
+}
\ No newline at end of file

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/ProcessInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/ProcessInfo.java?rev=1077176&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/ProcessInfo.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/ProcessInfo.java Fri Mar  4 03:48:49 2011
@@ -0,0 +1,59 @@
+package org.apache.hadoop.test.system;
+
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Daemon system level process information.
+ */
+public interface ProcessInfo extends Writable {
+  /**
+   * Get the current time in the millisecond.<br/>
+   * 
+   * @return current time on daemon clock in millisecond.
+   */
+  public long currentTimeMillis();
+
+  /**
+   * Get the environment that was used to start the Daemon process.<br/>
+   * 
+   * @return the environment variable list.
+   */
+  public Map<String,String> getEnv();
+
+  /**
+   * Get the System properties of the Daemon process.<br/>
+   * 
+   * @return the properties list.
+   */
+  public Map<String,String> getSystemProperties();
+
+  /**
+   * Get the number of active threads in Daemon VM.<br/>
+   * 
+   * @return number of active threads in Daemon VM.
+   */
+  public int activeThreadCount();
+
+  /**
+   * Get the maximum heap size that is configured for the Daemon VM. <br/>
+   * 
+   * @return maximum heap size.
+   */
+  public long maxMemory();
+
+  /**
+   * Get the free memory in Daemon VM.<br/>
+   * 
+   * @return free memory.
+   */
+  public long freeMemory();
+
+  /**
+   * Get the total used memory in Demon VM. <br/>
+   * 
+   * @return total used memory.
+   */
+  public long totalMemory();
+}
\ No newline at end of file

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/ProcessInfoImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/ProcessInfoImpl.java?rev=1077176&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/ProcessInfoImpl.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/ProcessInfoImpl.java Fri Mar  4 03:48:49 2011
@@ -0,0 +1,141 @@
+package org.apache.hadoop.test.system;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class ProcessInfoImpl implements ProcessInfo {
+
+  private int threadCount;
+  private long currentTime;
+  private long freemem;
+  private long maxmem;
+  private long totmem;
+  private Map<String, String> env;
+  private Map<String, String> props;
+
+  public ProcessInfoImpl() {
+    env = new HashMap<String, String>();
+    props = new HashMap<String, String>();
+  }
+
+  /**
+   * Construct a concrete process information object. <br/>
+   * 
+   * @param threadCount
+   *          count of threads.
+   * @param currentTime
+   * @param freememory
+   * @param maximummemory
+   * @param totalmemory
+   * @param env
+   *          environment list.
+   */
+  public ProcessInfoImpl(int threadCount, long currentTime, long freemem,
+      long maxmem, long totmem, Map<String, String> env, 
+      Map<String, String> props) {
+    this.threadCount = threadCount;
+    this.currentTime = currentTime;
+    this.freemem = freemem;
+    this.maxmem = maxmem;
+    this.totmem = totmem;
+    this.env = env;
+    this.props = props;
+  }
+
+  @Override
+  public int activeThreadCount() {
+    return threadCount;
+  }
+
+  @Override
+  public long currentTimeMillis() {
+    return currentTime;
+  }
+
+  @Override
+  public long freeMemory() {
+    return freemem;
+  }
+
+  @Override
+  public Map<String, String> getEnv() {
+    return env;
+  }
+
+  @Override
+  public Map<String,String> getSystemProperties() {
+    return props;
+  }
+
+  @Override
+  public long maxMemory() {
+    return maxmem;
+  }
+
+  @Override
+  public long totalMemory() {
+    return totmem;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.threadCount = in.readInt();
+    this.currentTime = in.readLong();
+    this.freemem = in.readLong();
+    this.maxmem = in.readLong();
+    this.totmem = in.readLong();
+    read(in, env);
+    read(in, props);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(threadCount);
+    out.writeLong(currentTime);
+    out.writeLong(freemem);
+    out.writeLong(maxmem);
+    out.writeLong(totmem);
+    write(out, env);
+    write(out, props);
+  }
+
+  private void read(DataInput in, Map<String, String> map) throws IOException {
+    int size = in.readInt();
+    for (int i = 0; i < size; i = i + 2) {
+      String key = in.readUTF();
+      String value = in.readUTF();
+      map.put(key, value);
+    }
+  }
+
+  private void write(DataOutput out, Map<String, String> map) 
+  throws IOException {
+    int size = (map.size() * 2);
+    out.writeInt(size);
+    for (Map.Entry<String, String> entry : map.entrySet()) {
+      out.writeUTF(entry.getKey());
+      out.writeUTF(entry.getValue());
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuffer strBuf = new StringBuffer();
+    strBuf.append(String.format("active threads : %d\n", threadCount));
+    strBuf.append(String.format("current time  : %d\n", currentTime));
+    strBuf.append(String.format("free memory  : %d\n", freemem));
+    strBuf.append(String.format("total memory  : %d\n", totmem));
+    strBuf.append(String.format("max memory  : %d\n", maxmem));
+    strBuf.append("Environment Variables : \n");
+    for (Map.Entry<String, String> entry : env.entrySet()) {
+      strBuf.append(String.format("key : %s value : %s \n", entry.getKey(),
+          entry.getValue()));
+    }
+    return strBuf.toString();
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/ClusterProcessManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/ClusterProcessManager.java?rev=1077176&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/ClusterProcessManager.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/ClusterProcessManager.java Fri Mar  4 03:48:49 2011
@@ -0,0 +1,71 @@
+package org.apache.hadoop.test.system.process;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Interface to manage the remote processes in the master-slave cluster.
+ */
+public interface ClusterProcessManager {
+
+  /**
+   * The configuration key to specify the concrete implementation of the
+   * {@link ClusterProcessManager} to be used by
+   * {@link ClusterProcessManagerFactory}.
+   */
+  String IMPL_CLASS = "test.system.clusterprocessmanager.impl.class";
+
+  /**
+   * Enumeration used to specify the types of the clusters which are supported
+   * by the concrete implementations of {@link ClusterProcessManager}.
+   */
+  public enum ClusterType {
+    MAPRED, HDFS
+  }
+  
+  /**
+   * Initialization method to set cluster type and also pass the configuration
+   * object which is required by the ClusterProcessManager to manage the 
+   * cluster.<br/>
+   * Configuration object should typically contain all the parameters which are 
+   * required by the implementations.<br/>
+   *  
+   * @param t type of the cluster to be managed.
+   * @param conf configuration containing values of the specific keys which 
+   * are required by the implementation of the cluster process manger.
+   * 
+   * @throws Exception when initialization fails.
+   */
+  void init(ClusterType t, Configuration conf) throws Exception;
+
+  /**
+   * Getter for master daemon process for managing the master daemon.<br/>
+   * 
+   * @return master daemon process.
+   */
+  RemoteProcess getMaster();
+
+  /**
+   * Getter for slave daemon process for managing the slaves.<br/>
+   * 
+   * @return map of slave hosts to slave daemon process.
+   */
+  Map<String, RemoteProcess> getSlaves();
+
+  /**
+   * Method to start the cluster including all master and slaves.<br/>
+   * 
+   * @throws IOException if startup procedure fails.
+   */
+  void start() throws IOException;
+
+  /**
+   * Method to shutdown all the master and slaves.<br/>
+   * 
+   * @throws IOException if shutdown procedure fails.
+   */
+  void stop() throws IOException;
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/ClusterProcessManagerFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/ClusterProcessManagerFactory.java?rev=1077176&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/ClusterProcessManagerFactory.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/ClusterProcessManagerFactory.java Fri Mar  4 03:48:49 2011
@@ -0,0 +1,35 @@
+package org.apache.hadoop.test.system.process;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.test.system.process.ClusterProcessManager.ClusterType;
+
+/**
+ * Factory to create ClusterProcessManager handle.
+ */
+public class ClusterProcessManagerFactory {
+
+  /**
+   * Factory method to create the {@link ClusterProcessManager} based on the
+   * {@code ClusterProcessManager.IMPL_CLASS} value. <br/>
+   * 
+   * @param t type of the cluster to be managed by the instance.
+   * @param conf the configuration required by the instance for 
+   * management of cluster.
+   * @return instance of the cluster to be used for management.
+   * 
+   * @throws Exception
+   */
+  public static ClusterProcessManager createInstance(ClusterType t,
+      Configuration conf) throws Exception {
+    String implKlass = conf.get(ClusterProcessManager.IMPL_CLASS, System
+        .getProperty(ClusterProcessManager.IMPL_CLASS));
+    if (implKlass == null || implKlass.isEmpty()) {
+      implKlass = HadoopDaemonRemoteCluster.class.getName();
+    }
+    Class<ClusterProcessManager> klass = (Class<ClusterProcessManager>) Class
+        .forName(implKlass);
+    ClusterProcessManager k = klass.newInstance();
+    k.init(t, conf);
+    return k;
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/HadoopDaemonRemoteCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/HadoopDaemonRemoteCluster.java?rev=1077176&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/HadoopDaemonRemoteCluster.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/HadoopDaemonRemoteCluster.java Fri Mar  4 03:48:49 2011
@@ -0,0 +1,274 @@
+package org.apache.hadoop.test.system.process;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+
+/**
+ * The concrete class which implements the start up and shut down based routines
+ * based on the hadoop-daemon.sh. <br/>
+ * 
+ * Class requires two keys to be present in the Configuration objects passed to
+ * it. Look at <code>CONF_HADOOPHOME</code> and
+ * <code>CONF_HADOOPCONFDIR</code> for the names of the
+ * configuration keys.
+ * 
+ * Following will be the format which the final command execution would look : 
+ * <br/>
+ * <code>
+ *  ssh master-host 'hadoop-home/bin/hadoop-daemon.sh --script scriptName 
+ *  --config HADOOP_CONF_DIR (start|stop) masterCommand'
+ * </code>
+ */
+public class HadoopDaemonRemoteCluster implements ClusterProcessManager {
+
+  private static final Log LOG = LogFactory
+      .getLog(HadoopDaemonRemoteCluster.class.getName());
+
+  /**
+   * Key used to configure the HADOOP_HOME to be used by the
+   * HadoopDaemonRemoteCluster.
+   */
+  public final static String CONF_HADOOPHOME = "test.system.hdrc.hadoophome";
+  /**
+   * Key used to configure the HADOOP_CONF_DIR to be used by the
+   * HadoopDaemonRemoteCluster.
+   */
+  public final static String CONF_HADOOPCONFDIR = 
+    "test.system.hdrc.hadoopconfdir";
+
+  public final static String CONF_DEPLOYED_HADOOPCONFDIR =
+    "test.system.hdrc.deployed.hadoopconfdir";
+
+  private String hadoopHome;
+  private String hadoopConfDir;
+  private String deployed_hadoopConfDir;
+  private String masterCommand;
+  private String slaveCommand;
+
+  private RemoteProcess master;
+  private Map<String, RemoteProcess> slaves;
+
+  @Override
+  public void init(ClusterType t, Configuration conf) throws Exception {
+    /*
+     * Initialization strategy of the HadoopDaemonRemoteCluster is three staged
+     * process: 1. Populate script names based on the type of passed cluster. 2.
+     * Populate the required directories. 3. Populate the master and slaves.
+     */
+    populateScriptNames(t);
+    populateDirectories(conf);
+    this.slaves = new HashMap<String, RemoteProcess>();
+    populateDaemons(deployed_hadoopConfDir);
+  }
+
+  /**
+   * Method to populate the required master and slave commands which are used to
+   * manage the cluster.<br/>
+   * 
+   * @param t
+   *          type of cluster to be initialized.
+   * 
+   * @throws UnsupportedOperationException
+   *           if the passed cluster type is not MAPRED or HDFS
+   */
+  private void populateScriptNames(ClusterType t) {
+    switch (t) {
+    case MAPRED:
+      masterCommand = "jobtracker";
+      slaveCommand = "tasktracker";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Created mapred hadoop daemon remote cluster manager with "
+            + "scriptName: mapred, masterCommand: jobtracker, "
+            + "slaveCommand: tasktracker");
+      }
+      break;
+    case HDFS:
+      masterCommand = "namenode";
+      slaveCommand = "datanode";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Created hdfs hadoop daemon remote cluster manager with "
+            + "scriptName: hdfs, masterCommand: namenode, "
+            + "slaveCommand: datanode");
+      }
+      break;
+    default:
+      LOG.error("Cluster type :" + t
+          + "is not supported currently by HadoopDaemonRemoteCluster");
+      throw new UnsupportedOperationException(
+          "The specified cluster type is not supported by the " +
+          "HadoopDaemonRemoteCluster");
+    }
+  }
+
+  /**
+   * Method to populate the hadoop home and hadoop configuration directories.
+   * 
+   * @param conf
+   *          Configuration object containing values for
+   *          TEST_SYSTEM_HADOOPHOME_CONF_KEY and
+   *          TEST_SYSTEM_HADOOPCONFDIR_CONF_KEY
+   * 
+   * @throws IllegalArgumentException
+   *           if the configuration or system property set does not contain
+   *           values for the required keys.
+   */
+  private void populateDirectories(Configuration conf) {
+    hadoopHome = conf.get(CONF_HADOOPHOME, System
+        .getProperty(CONF_HADOOPHOME));
+    hadoopConfDir = conf.get(CONF_HADOOPCONFDIR, System
+        .getProperty(CONF_HADOOPCONFDIR));
+
+    deployed_hadoopConfDir = conf.get(CONF_DEPLOYED_HADOOPCONFDIR,
+      System.getProperty(CONF_DEPLOYED_HADOOPCONFDIR));
+    if (deployed_hadoopConfDir == null || deployed_hadoopConfDir.isEmpty()) {
+      deployed_hadoopConfDir = hadoopConfDir;
+    }
+
+    if (hadoopHome == null || hadoopConfDir == null || hadoopHome.isEmpty()
+        || hadoopConfDir.isEmpty()) {
+      LOG.error("No configuration "
+          + "for the HADOOP_HOME and HADOOP_CONF_DIR passed");
+      throw new IllegalArgumentException(
+          "No Configuration passed for hadoop home " +
+          "and hadoop conf directories");
+    }
+
+  }
+
+  @Override
+  public RemoteProcess getMaster() {
+    return master;
+  }
+
+  @Override
+  public Map<String, RemoteProcess> getSlaves() {
+    return slaves;
+  }
+
+  @Override
+  public void start() throws IOException {
+    // start master first.
+    master.start();
+    for (RemoteProcess slave : slaves.values()) {
+      slave.start();
+    }
+  }
+
+  @Override
+  public void stop() throws IOException {
+    master.kill();
+    for (RemoteProcess slave : slaves.values()) {
+      slave.kill();
+    }
+  }
+
+  private void populateDaemons(String confLocation) throws IOException {
+    File mastersFile = new File(confLocation, "masters");
+    File slavesFile = new File(confLocation, "slaves");
+    BufferedReader reader = null;
+    try {
+      reader = new BufferedReader(new FileReader(mastersFile));
+      String masterHost = null;
+      masterHost = reader.readLine();
+      if (masterHost != null && !masterHost.trim().isEmpty()) {
+        master = new ScriptDaemon(masterCommand, masterHost);
+      }
+    } finally {
+      try {
+        reader.close();
+      } catch (Exception e) {
+        LOG.error("Can't read masters file from " + confLocation);
+      }
+
+    }
+    try {
+      reader = new BufferedReader(new FileReader(slavesFile));
+      String slaveHost = null;
+      while ((slaveHost = reader.readLine()) != null) {
+        RemoteProcess slave = new ScriptDaemon(slaveCommand, slaveHost);
+        slaves.put(slaveHost, slave);
+      }
+    } finally {
+      try {
+        reader.close();
+      } catch (Exception e) {
+        LOG.error("Can't read slaves file from " + confLocation);
+      }
+    }
+  }
+
+  /**
+   * The core daemon class which actually implements the remote process
+   * management of actual daemon processes in the cluster.
+   * 
+   */
+  class ScriptDaemon implements RemoteProcess {
+
+    private static final String STOP_COMMAND = "stop";
+    private static final String START_COMMAND = "start";
+    private static final String SCRIPT_NAME = "hadoop-daemon.sh";
+    private final String daemonName;
+    private final String hostName;
+
+    public ScriptDaemon(String daemonName, String hostName) {
+      this.daemonName = daemonName;
+      this.hostName = hostName;
+    }
+
+    @Override
+    public String getHostName() {
+      return hostName;
+    }
+
+    private ShellCommandExecutor buildCommandExecutor(String command) {
+      String[] commandArgs = getCommand(command);
+      File binDir = getBinDir();
+      HashMap<String, String> env = new HashMap<String, String>();
+      env.put("HADOOP_CONF_DIR", hadoopConfDir);
+      ShellCommandExecutor executor = new ShellCommandExecutor(commandArgs,
+          binDir, env);
+      LOG.info(executor.toString());
+      return executor;
+    }
+
+    private File getBinDir() {
+      File binDir = new File(hadoopHome, "bin");
+      return binDir;
+    }
+
+    private String[] getCommand(String command) {
+      ArrayList<String> cmdArgs = new ArrayList<String>();
+      File binDir = getBinDir();
+      cmdArgs.add("ssh");
+      cmdArgs.add(hostName);
+      cmdArgs.add(binDir.getAbsolutePath() + File.separator + SCRIPT_NAME);
+      cmdArgs.add("--config");
+      cmdArgs.add(hadoopConfDir);
+      // XXX Twenty internal version does not support --script option.
+      cmdArgs.add(command);
+      cmdArgs.add(daemonName);
+      return (String[]) cmdArgs.toArray(new String[cmdArgs.size()]);
+    }
+
+    @Override
+    public void kill() throws IOException {
+      buildCommandExecutor(STOP_COMMAND).execute();
+    }
+
+    @Override
+    public void start() throws IOException {
+      buildCommandExecutor(START_COMMAND).execute();
+    }
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/RemoteProcess.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/RemoteProcess.java?rev=1077176&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/RemoteProcess.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/RemoteProcess.java Fri Mar  4 03:48:49 2011
@@ -0,0 +1,29 @@
+package org.apache.hadoop.test.system.process;
+
+import java.io.IOException;
+
+/**
+ * Interface to manage the remote process.
+ */
+public interface RemoteProcess {
+  /**
+   * Get the host on which the daemon process is running/stopped.<br/>
+   * 
+   * @return hostname on which process is running/stopped.
+   */
+  String getHostName();
+
+  /**
+   * Start a given daemon process.<br/>
+   * 
+   * @throws IOException if startup fails.
+   */
+  void start() throws IOException;
+
+  /**
+   * Stop a given daemon process.<br/>
+   * 
+   * @throws IOException if shutdown fails.
+   */
+  void kill() throws IOException;
+}
\ No newline at end of file

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/testjar/UserNamePermission.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/testjar/UserNamePermission.java?rev=1077176&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/testjar/UserNamePermission.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/testjar/UserNamePermission.java Fri Mar  4 03:48:49 2011
@@ -0,0 +1,83 @@
+package testjar;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+public  class UserNamePermission      
+{
+
+  private static final Log LOG = LogFactory.getLog(UserNamePermission.class);
+  //This mapper will read the user name and pass in to the reducer
+  public static class UserNameMapper extends Mapper<LongWritable,Text,Text,Text>
+  {
+    Text key1 = new Text("UserName");
+    public void map(LongWritable key, Text value, Context context)
+      throws IOException,InterruptedException {
+      Text val = new Text(System.getProperty("user.name").toString());
+      context.write(key1, val);
+    }
+  }
+
+  //The reducer is responsible for writing the user name to the file
+  //which will be validated by the testcase
+  public static class UserNameReducer extends Reducer<Text,Text,Text,Text>
+  {
+    public void reduce(Text key, Iterator<Text> values,
+      Context context) throws IOException,InterruptedException {
+	  			
+      LOG.info("The key "+key);
+      if(values.hasNext())
+      {
+        Text val = values.next();
+        LOG.info("The value  "+val);
+	  				 
+        context.write(key,new Text(System.getProperty("user.name")));
+	  }
+	  				  			 
+	}
+  }
+		
+  public static void main(String [] args) throws Exception
+  {
+    Path outDir = new Path("output");
+    Configuration conf = new Configuration();
+    Job job = new Job(conf, "user name check"); 
+			
+			
+    job.setJarByClass(UserNamePermission.class);
+    job.setMapperClass(UserNamePermission.UserNameMapper.class);
+    job.setCombinerClass(UserNamePermission.UserNameReducer.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Text.class);
+    job.setReducerClass(UserNamePermission.UserNameReducer.class);
+    job.setNumReduceTasks(1);
+		    
+    job.setInputFormatClass(TextInputFormat.class);
+    TextInputFormat.addInputPath(job, new Path("input"));
+    FileOutputFormat.setOutputPath(job, outDir);
+		    
+    System.exit(job.waitForCompletion(true) ? 0 : 1);
+  }
+
+}
+
+
+    
\ No newline at end of file



Mime
View raw message