Return-Path: Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: (qmail 85591 invoked from network); 4 Mar 2011 03:49:20 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 4 Mar 2011 03:49:20 -0000 Received: (qmail 12067 invoked by uid 500); 4 Mar 2011 03:49:20 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 12035 invoked by uid 500); 4 Mar 2011 03:49:20 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 12018 invoked by uid 99); 4 Mar 2011 03:49:19 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 03:49:19 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 03:49:17 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 5071E2388C22; Fri, 4 Mar 2011 03:48:57 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1077177 - in /hadoop/common/branches/branch-0.20-security-patches/src/test/system: aop/org/apache/hadoop/mapred/ aop/org/apache/hadoop/test/system/ java/org/apache/hadoop/mapred/ java/org/apache/hadoop/mapreduce/test/system/ java/org/apach... Date: Fri, 04 Mar 2011 03:48:57 -0000 To: common-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110304034857.5071E2388C22@eris.apache.org> Author: omalley Date: Fri Mar 4 03:48:56 2011 New Revision: 1077177 URL: http://svn.apache.org/viewvc?rev=1077177&view=rev Log: commit 5fb460785475c22997d4f4fcaa144cb368e7239b Author: Sharad Agarwal Date: Mon Feb 22 13:19:07 2010 +0530 patch from Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/TaskAspect.aj hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestControlledJob.java hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/FinishTaskControlAction.java hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/ControlAction.java Removed: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRFault.java Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/JobClientAspect.aj hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/TaskTrackerAspect.aj hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/test/system/DaemonProtocolAspect.aj hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTClient.java hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonClient.java hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractMasterSlaveCluster.java hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/DaemonProtocol.java Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/JobClientAspect.aj URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/JobClientAspect.aj?rev=1077177&r1=1077176&r2=1077177&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/JobClientAspect.aj (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/JobClientAspect.aj Fri Mar 4 03:48:56 2011 @@ -1,8 +1,16 @@ package org.apache.hadoop.mapred; +import java.io.IOException; +import org.apache.hadoop.mapreduce.JobID; + public privileged aspect JobClientAspect { public JobSubmissionProtocol JobClient.getProtocol() { return jobSubmitClient; } + + public void JobClient.killJob(JobID id) throws IOException { + jobSubmitClient.killJob( + org.apache.hadoop.mapred.JobID.downgrade(id)); + } } Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/TaskAspect.aj URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/TaskAspect.aj?rev=1077177&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/TaskAspect.aj (added) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/TaskAspect.aj Fri Mar 4 03:48:56 2011 @@ -0,0 +1,95 @@ +package org.apache.hadoop.mapred; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.mapred.Task.TaskReporter; +import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction; +import org.apache.hadoop.test.system.ControlAction; +import org.apache.hadoop.test.system.DaemonProtocol; + +public privileged aspect TaskAspect { + + private static final Log LOG = LogFactory.getLog(TaskAspect.class); + + private Object waitObject = new Object(); + private AtomicBoolean isWaitingForSignal = new AtomicBoolean(false); + + private DaemonProtocol daemonProxy; + + pointcut taskDoneIntercept(Task task) : execution( + public void Task.done(..)) && target(task); + + void around(Task task) : taskDoneIntercept(task) { + if(task.isJobCleanupTask() || task.isJobSetupTask() || task.isTaskCleanupTask()) { + proceed(task); + return; + } + Configuration conf = task.getConf(); + boolean controlEnabled = FinishTaskControlAction.isControlActionEnabled(conf); + if(controlEnabled) { + LOG.info("Task control enabled, waiting till client sends signal to " + + "complete"); + try { + synchronized (waitObject) { + isWaitingForSignal.set(true); + waitObject.wait(); + } + } catch (InterruptedException e) { + } + } + proceed(task); + return; + } + + pointcut taskStatusUpdate(TaskReporter reporter, TaskAttemptID id) : + call(public boolean TaskUmbilicalProtocol.ping(TaskAttemptID)) + && this(reporter) && args(id); + + after(TaskReporter reporter, TaskAttemptID id) throws IOException : + taskStatusUpdate(reporter, id) { + synchronized (waitObject) { + if(isWaitingForSignal.get()) { + ControlAction[] actions = daemonProxy.getActions( + id.getTaskID()); + if(actions.length == 0) { + return; + } + boolean shouldProceed = false; + for(ControlAction action : actions) { + if (action instanceof FinishTaskControlAction) { + LOG.info("Recv : Control task action to finish task id: " + + action.getTarget()); + shouldProceed = true; + daemonProxy.removeAction(action); + LOG.info("Removed the control action from TaskTracker"); + break; + } + } + if(shouldProceed) { + LOG.info("Notifying the task to completion"); + waitObject.notify(); + } + } + } + } + + + pointcut rpcInterceptor(Class k, long version,InetSocketAddress addr, + Configuration conf) : call( + public static * RPC.getProxy(Class, long ,InetSocketAddress, + Configuration)) && args(k, version,addr, conf) && + within(org.apache.hadoop.mapred.Child) ; + + after(Class k, long version, InetSocketAddress addr, Configuration conf) + throws IOException : rpcInterceptor(k, version, addr, conf) { + daemonProxy = + (DaemonProtocol) RPC.getProxy( + DaemonProtocol.class, DaemonProtocol.versionID, addr, conf); + } + +} Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/TaskTrackerAspect.aj URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/TaskTrackerAspect.aj?rev=1077177&r1=1077176&r2=1077177&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/TaskTrackerAspect.aj (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/mapred/TaskTrackerAspect.aj Fri Mar 4 03:48:56 2011 @@ -4,13 +4,13 @@ import java.io.IOException; import java.util.List; import java.util.ArrayList; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.test.system.JTProtocol; import org.apache.hadoop.mapreduce.test.system.TTProtocol; import org.apache.hadoop.mapreduce.test.system.TTTaskInfo; import org.apache.hadoop.mapred.TTTaskInfoImpl.MapTTTaskInfo; import org.apache.hadoop.mapred.TTTaskInfoImpl.ReduceTTTaskInfo; +import org.apache.hadoop.test.system.ControlAction; import org.apache.hadoop.test.system.DaemonProtocol; -import org.apache.hadoop.test.system.DaemonProtocolAspect; +import org.apache.hadoop.mapred.TaskTracker.TaskInProgress; public privileged aspect TaskTrackerAspect { @@ -75,4 +75,5 @@ public privileged aspect TaskTrackerAspe return proceed(protocol, clientVersion); } } + } Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/test/system/DaemonProtocolAspect.aj URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/test/system/DaemonProtocolAspect.aj?rev=1077177&r1=1077176&r2=1077177&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/test/system/DaemonProtocolAspect.aj (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/aop/org/apache/hadoop/test/system/DaemonProtocolAspect.aj Fri Mar 4 03:48:56 2011 @@ -3,10 +3,11 @@ package org.apache.hadoop.test.system; import java.io.IOException; import java.util.HashMap; import java.util.List; +import java.util.ArrayList; import java.util.Map; import java.util.Properties; - +import org.apache.hadoop.io.Writable; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -22,7 +23,11 @@ import org.apache.hadoop.conf.Configurat public aspect DaemonProtocolAspect { private boolean DaemonProtocol.ready; - + + @SuppressWarnings("unchecked") + private HashMap> DaemonProtocol.actions = + new HashMap>(); + /** * Set if the daemon process is ready or not, concrete daemon protocol should * implement pointcuts to determine when the daemon is ready and use the @@ -140,4 +145,66 @@ public aspect DaemonProtocolAspect { } return fs; } + + + @SuppressWarnings("unchecked") + public ControlAction[] DaemonProtocol.getActions(Writable key) + throws IOException { + synchronized (actions) { + List actionList = actions.get(key); + if(actionList == null) { + return new ControlAction[0]; + } else { + return (ControlAction[]) actionList.toArray(new ControlAction[actionList + .size()]); + } + } + } + + + @SuppressWarnings("unchecked") + public void DaemonProtocol.sendAction(ControlAction action) + throws IOException { + synchronized (actions) { + List actionList = actions.get(action.getTarget()); + if(actionList == null) { + actionList = new ArrayList(); + actions.put(action.getTarget(), actionList); + } + actionList.add(action); + } + } + + @SuppressWarnings("unchecked") + public boolean DaemonProtocol.isActionPending(ControlAction action) + throws IOException{ + synchronized (actions) { + List actionList = actions.get(action.getTarget()); + if(actionList == null) { + return false; + } else { + return actionList.contains(action); + } + } + } + + + @SuppressWarnings("unchecked") + public void DaemonProtocol.removeAction(ControlAction action) + throws IOException { + synchronized (actions) { + List actionList = actions.get(action.getTarget()); + if(actionList == null) { + return; + } else { + actionList.remove(action); + } + } + } + + public void DaemonProtocol.clearActions() throws IOException { + synchronized (actions) { + actions.clear(); + } + } } Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestControlledJob.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestControlledJob.java?rev=1077177&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestControlledJob.java (added) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestControlledJob.java Fri Mar 4 03:48:56 2011 @@ -0,0 +1,90 @@ +package org.apache.hadoop.mapred; + + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.examples.SleepJob; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction; +import org.apache.hadoop.mapreduce.test.system.JTProtocol; +import org.apache.hadoop.mapreduce.test.system.JobInfo; +import org.apache.hadoop.mapreduce.test.system.MRCluster; +import org.apache.hadoop.mapreduce.test.system.TTClient; +import org.apache.hadoop.mapreduce.test.system.TaskInfo; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestControlledJob { + private MRCluster cluster; + + private static final Log LOG = LogFactory.getLog(TestControlledJob.class); + + public TestControlledJob() throws Exception { + cluster = MRCluster.createCluster(new Configuration()); + } + + @Before + public void before() throws Exception { + cluster.setUp(); + } + + @After + public void after() throws Exception { + cluster.tearDown(); + } + + @Test + public void testControlledJob() throws Exception { + Configuration conf = new Configuration(cluster.getConf()); + JTProtocol wovenClient = cluster.getMaster().getProxy(); + FinishTaskControlAction.configureControlActionForJob(conf); + SleepJob job = new SleepJob(); + job.setConf(conf); + + conf = job.setupJobConf(1, 0, 100, 100, 100, 100); + JobClient client = cluster.getMaster().getClient(); + + RunningJob rJob = client.submitJob(new JobConf(conf)); + JobID id = rJob.getID(); + + JobInfo jInfo = wovenClient.getJobInfo(id); + + while (jInfo.getStatus().getRunState() != JobStatus.RUNNING) { + Thread.sleep(1000); + jInfo = wovenClient.getJobInfo(id); + } + + LOG.info("Waiting till job starts running one map"); + jInfo = wovenClient.getJobInfo(id); + Assert.assertEquals(jInfo.runningMaps(), 1); + + LOG.info("waiting for another cycle to " + + "check if the maps dont finish off"); + Thread.sleep(1000); + jInfo = wovenClient.getJobInfo(id); + Assert.assertEquals(jInfo.runningMaps(), 1); + + TaskInfo[] taskInfos = wovenClient.getTaskInfo(id); + + for(TaskInfo info : taskInfos) { + LOG.info("constructing control action to signal task to finish"); + FinishTaskControlAction action = new FinishTaskControlAction( + TaskID.downgrade(info.getTaskID())); + for(TTClient cli : cluster.getSlaves().values()) { + cli.getProxy().sendAction(action); + } + } + + jInfo = wovenClient.getJobInfo(id); + while(!jInfo.getStatus().isJobComplete()) { + Thread.sleep(1000); + jInfo = wovenClient.getJobInfo(id); + } + + LOG.info("Job sucessfully completed after signalling!!!!"); + } +} Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/FinishTaskControlAction.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/FinishTaskControlAction.java?rev=1077177&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/FinishTaskControlAction.java (added) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/FinishTaskControlAction.java Fri Mar 4 03:48:56 2011 @@ -0,0 +1,52 @@ +package org.apache.hadoop.mapreduce.test.system; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.TaskID; +import org.apache.hadoop.test.system.ControlAction; + +/** + * Control Action which signals a controlled task to proceed to completion.
+ */ +public class FinishTaskControlAction extends ControlAction { + + private static final String ENABLE_CONTROLLED_TASK_COMPLETION = + "test.system.enabled.task.completion.control"; + + /** + * Create a default control action.
+ * + */ + public FinishTaskControlAction() { + super(new TaskID()); + } + + /** + * Create a control action specific to a particular task.
+ * + * @param id + * of the task. + */ + public FinishTaskControlAction(TaskID id) { + super(id); + } + + /** + * Sets up the job to be controlled using the finish task control action. + *
+ * + * @param conf + * configuration to be used submit the job. + */ + public static void configureControlActionForJob(Configuration conf) { + conf.setBoolean(ENABLE_CONTROLLED_TASK_COMPLETION, true); + } + + /** + * Checks if the control action is enabled in the passed configuration.
+ * @param conf configuration + * @return true if action is enabled. + */ + public static boolean isControlActionEnabled(Configuration conf) { + return conf.getBoolean(ENABLE_CONTROLLED_TASK_COMPLETION, false); + } +} Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTClient.java?rev=1077177&r1=1077176&r2=1077177&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTClient.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTClient.java Fri Mar 4 03:48:56 2011 @@ -82,6 +82,15 @@ public class JTClient extends MRDaemonCl public Configuration getJobTrackerConfig() throws IOException { return getProxy().getDaemonConf(); } + + /** + * Kills the job.
+ * @param id of the job to be killed. + * @throws IOException + */ + public void killJob(JobID id) throws IOException { + getClient().killJob(id); + } /** * Verification API to check running jobs and running job states. Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java?rev=1077177&r1=1077176&r2=1077177&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java Fri Mar 4 03:48:56 2011 @@ -68,5 +68,11 @@ public class MRCluster extends AbstractM public void ensureClean() throws IOException { //TODO: ensure that no jobs/tasks are running //restart the cluster if cleanup fails + JTClient jtClient = getMaster(); + JobInfo[] jobs = jtClient.getProxy().getAllJobInfo(); + for(JobInfo job : jobs) { + jtClient.getClient().killJob( + org.apache.hadoop.mapred.JobID.downgrade(job.getID())); + } } } Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonClient.java?rev=1077177&r1=1077176&r2=1077177&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonClient.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonClient.java Fri Mar 4 03:48:56 2011 @@ -32,6 +32,11 @@ public abstract class AbstractDaemonClie this.process = process; } + /** + * Gets if the client is connected to the Daemon
+ * + * @return true if connected. + */ public boolean isConnected() { return connected; } @@ -40,8 +45,17 @@ public abstract class AbstractDaemonClie this.connected = connected; } + /** + * Create an RPC proxy to the daemon
+ * + * @throws IOException + */ public abstract void connect() throws IOException; + /** + * Disconnect the underlying RPC proxy to the daemon.
+ * @throws IOException + */ public abstract void disconnect() throws IOException; /** @@ -51,51 +65,113 @@ public abstract class AbstractDaemonClie */ protected abstract PROXY getProxy(); + /** + * Gets the daemon level configuration.
+ * + * @return configuration using which daemon is running + */ public Configuration getConf() { return conf; } + /** + * Gets the host on which Daemon is currently running.
+ * + * @return hostname + */ public String getHostName() { return process.getHostName(); } + /** + * Gets if the Daemon is ready to accept RPC connections.
+ * + * @return true if daemon is ready. + * @throws IOException + */ public boolean isReady() throws IOException { return getProxy().isReady(); } + /** + * Kills the Daemon process
+ * @throws IOException + */ public void kill() throws IOException { process.kill(); } + /** + * Checks if the Daemon process is alive or not
+ * + * @throws IOException + */ public void ping() throws IOException { getProxy().ping(); } + /** + * Start up the Daemon process.
+ * @throws IOException + */ public void start() throws IOException { process.start(); } + /** + * Get system level view of the Daemon process. + * + * @return returns system level view of the Daemon process. + * + * @throws IOException + */ public ProcessInfo getProcessInfo() throws IOException { return getProxy().getProcessInfo(); } - public void enable(List> faults) throws IOException { - getProxy().enable(faults); - } - - public void disableAll() throws IOException { - getProxy().disableAll(); - } - + /** + * Return a file status object that represents the path. + * @param path + * given path + * @param local + * whether the path is local or not + * @return a FileStatus object + * @throws FileNotFoundException when the path does not exist; + * IOException see specific implementation + */ public FileStatus getFileStatus(String path, boolean local) throws IOException { return getProxy().getFileStatus(path, local); } + /** + * List the statuses of the files/directories in the given path if the path is + * a directory. + * + * @param path + * given path + * @param local + * whether the path is local or not + * @return the statuses of the files/directories in the given patch + * @throws IOException + */ public FileStatus[] listStatus(String path, boolean local) throws IOException { return getProxy().listStatus(path, local); } + /** + * List the statuses of the files/directories in the given path if the path is + * a directory recursive/nonrecursively depending on parameters + * + * @param path + * given path + * @param local + * whether the path is local or not + * @param recursive + * whether to recursively get the status + * @return the statuses of the files/directories in the given patch + * @throws IOException + */ public FileStatus[] listStatus(String f, boolean local, boolean recursive) throws IOException { List status = new ArrayList(); @@ -118,4 +194,5 @@ public abstract class AbstractDaemonClie } } } + } Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractMasterSlaveCluster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractMasterSlaveCluster.java?rev=1077177&r1=1077176&r2=1077177&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractMasterSlaveCluster.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractMasterSlaveCluster.java Fri Mar 4 03:48:56 2011 @@ -4,7 +4,6 @@ import java.io.IOException; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; -import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; @@ -236,48 +235,6 @@ public abstract class AbstractMasterSlav } /** - * Enable/Inject the faults. In case fault can't be enabled on ALL nodes - * cluster is restarted. - */ - public void enable(List> faults) throws IOException { - try { - enableFaults(faults); - } catch (IOException e) { - stop(); - start(); - enableFaults(faults); - } - } - - /** - * Disable/Remove the all the faults. In case fault can't be disabled on ALL - * nodes cluster is restarted. - */ - public void disableAllFaults() throws IOException { - try { - disableFaults(); - } catch (IOException e) { - stop(); - start(); - disableFaults(); - } - } - - private void enableFaults(List> faults) throws IOException { - master.enable(faults); - for (SLAVE slave : slaves.values()) { - slave.enable(faults); - } - } - - private void disableFaults() throws IOException { - master.disableAll(); - for (SLAVE slave : slaves.values()) { - slave.disableAll(); - } - } - - /** * Ping all the daemons of the cluster. * @throws IOException */ @@ -302,6 +259,7 @@ public abstract class AbstractMasterSlav } connect(); ping(); + clearAllControlActions(); ensureClean(); } @@ -313,11 +271,22 @@ public abstract class AbstractMasterSlav } /** + * Clears all the pending control actions in the cluster.
+ * @throws IOException + */ + public void clearAllControlActions() throws IOException { + master.getProxy().clearActions(); + for (SLAVE slave : getSlaves().values()) { + slave.getProxy().clearActions(); + } + } + /** * Ensure that cluster is clean. Disconnect from the RPC ports of the daemons. * @throws IOException */ public void tearDown() throws IOException { ensureClean(); + clearAllControlActions(); disconnect(); } } Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/ControlAction.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/ControlAction.java?rev=1077177&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/ControlAction.java (added) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/ControlAction.java Fri Mar 4 03:48:56 2011 @@ -0,0 +1,68 @@ +package org.apache.hadoop.test.system; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +/** + * Class to represent a control action which can be performed on Daemon.
+ * + */ + +public abstract class ControlAction implements Writable { + + private T target; + + /** + * Default constructor of the Control Action, sets the Action type to zero.
+ */ + public ControlAction() { + } + + /** + * Constructor which sets the type of the Control action to a specific type.
+ * + * @param type + * of the control action. + */ + public ControlAction(T target) { + this.target = target; + } + + /** + * Gets the id of the control action
+ * + * @return target of action + */ + public T getTarget() { + return target; + } + + @Override + public void readFields(DataInput in) throws IOException { + target.readFields(in); + } + + @Override + public void write(DataOutput out) throws IOException { + target.write(out); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof ControlAction) { + ControlAction other = (ControlAction) obj; + return (this.target.equals(other.getTarget())); + } else { + return false; + } + } + + + @Override + public String toString() { + return "Action Target : " + this.target; + } +} Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/DaemonProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/DaemonProtocol.java?rev=1077177&r1=1077176&r2=1077177&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/DaemonProtocol.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/DaemonProtocol.java Fri Mar 4 03:48:56 2011 @@ -1,11 +1,11 @@ package org.apache.hadoop.test.system; -import java.io.FileNotFoundException; import java.io.IOException; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.ipc.VersionedProtocol; /** @@ -45,24 +45,7 @@ public interface DaemonProtocol extends * @throws IOException */ ProcessInfo getProcessInfo() throws IOException; - - /** - * Enable the set of specified faults in the Daemon.
- * - * @param faults - * list of faults to be enabled. - * - * @throws IOException - */ - void enable(List> faults) throws IOException; - - /** - * Disable all the faults which are enabled in the Daemon.
- * - * @throws IOException - */ - void disableAll() throws IOException; - + /** * Return a file status object that represents the path. * @param path @@ -87,4 +70,56 @@ public interface DaemonProtocol extends * @throws IOException */ FileStatus[] listStatus(String path, boolean local) throws IOException; + + /** + * Enables a particular control action to be performed on the Daemon
+ * + * @param control action to be enabled. + * + * @throws IOException + */ + @SuppressWarnings("unchecked") + void sendAction(ControlAction action) throws IOException; + + /** + * Checks if the particular control action has be delivered to the Daemon + * component
+ * + * @param action to be checked. + * + * @return true if action is still in waiting queue of + * actions to be delivered. + * @throws IOException + */ + @SuppressWarnings("unchecked") + boolean isActionPending(ControlAction action) throws IOException; + + /** + * Removes a particular control action from the list of the actions which the + * daemon maintains.
+ * Not to be directly called by Test Case or clients. + * @param action to be removed + * @throws IOException + */ + + @SuppressWarnings("unchecked") + void removeAction(ControlAction action) throws IOException; + + /** + * Clears out the list of control actions on the particular daemon. + *
+ * @throws IOException + */ + void clearActions() throws IOException; + + /** + * Gets a list of pending actions which are targeted on the specified key. + *
+ * Not to be directly used by clients + * @param key target + * @return list of actions. + * @throws IOException + */ + @SuppressWarnings("unchecked") + ControlAction[] getActions(Writable key) throws IOException; } \ No newline at end of file