Return-Path: Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: (qmail 94979 invoked from network); 4 Mar 2011 03:58:02 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 4 Mar 2011 03:58:02 -0000 Received: (qmail 54446 invoked by uid 500); 4 Mar 2011 03:58:01 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 54417 invoked by uid 500); 4 Mar 2011 03:58:01 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 54400 invoked by uid 99); 4 Mar 2011 03:58:01 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 03:58:01 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 03:57:58 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id A8A862388B3A; Fri, 4 Mar 2011 03:57:38 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1077263 - in /hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop: mapred/ mapreduce/test/system/ test/system/ test/system/process/ Date: Fri, 04 Mar 2011 03:57:38 -0000 To: common-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110304035738.A8A862388B3A@eris.apache.org> Author: omalley Date: Fri Mar 4 03:57:38 2011 New Revision: 1077263 URL: http://svn.apache.org/viewvc?rev=1077263&view=rev Log: commit 60fd4d56096ea4e5a37e0f3b48223a59ec450eae Author: Sharad Agarwal Date: Tue Mar 2 10:45:50 2010 +0530 patch from Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonCluster.java Removed: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractMasterSlaveCluster.java hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/ClusterProcessManagerFactory.java Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestCluster.java hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestControlledJob.java hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestSortValidate.java hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskOwner.java hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/ClusterProcessManager.java hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/HadoopDaemonRemoteCluster.java hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/RemoteProcess.java Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestCluster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestCluster.java?rev=1077263&r1=1077262&r2=1077263&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestCluster.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestCluster.java Fri Mar 4 03:57:38 2011 @@ -19,7 +19,6 @@ import org.apache.hadoop.mapreduce.test. import org.apache.hadoop.mapreduce.test.system.TTInfo; import org.apache.hadoop.mapreduce.test.system.TTTaskInfo; import org.apache.hadoop.mapreduce.test.system.TaskInfo; -import org.apache.hadoop.net.NetUtils; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -47,35 +46,33 @@ public class TestCluster { @Test public void testProcessInfo() throws Exception { - LOG.info("Process info of master is : " - + cluster.getMaster().getProcessInfo()); - Assert.assertNotNull(cluster.getMaster().getProcessInfo()); - Collection slaves = cluster.getSlaves().values(); - for (TTClient slave : slaves) { - LOG.info("Process info of slave is : " + slave.getProcessInfo()); - Assert.assertNotNull(slave.getProcessInfo()); + LOG.info("Process info of JobTracker is : " + + cluster.getJTClient().getProcessInfo()); + Assert.assertNotNull(cluster.getJTClient().getProcessInfo()); + Collection tts = cluster.getTTClients(); + for (TTClient tt : tts) { + LOG.info("Process info of TaskTracker is : " + tt.getProcessInfo()); + Assert.assertNotNull(tt.getProcessInfo()); } } @Test public void testJobSubmission() throws Exception { Configuration conf = new Configuration(cluster.getConf()); - JTProtocol wovenClient = cluster.getMaster().getProxy(); - JobInfo[] jobs = wovenClient.getAllJobInfo(); SleepJob job = new SleepJob(); job.setConf(conf); conf = job.setupJobConf(1, 1, 100, 100, 100, 100); - RunningJob rJob = cluster.getMaster().submitAndVerifyJob(conf); - cluster.getMaster().verifyJobHistory(rJob.getID()); + RunningJob rJob = cluster.getJTClient().submitAndVerifyJob(conf); + cluster.getJTClient().verifyJobHistory(rJob.getID()); } @Test public void testFileStatus() throws Exception { - JTClient jt = cluster.getMaster(); + JTClient jt = cluster.getJTClient(); String dir = "."; checkFileStatus(jt.getFileStatus(dir, true)); checkFileStatus(jt.listStatus(dir, false, true), dir); - for (TTClient tt : cluster.getSlaves().values()) { + for (TTClient tt : cluster.getTTClients()) { String[] localDirs = tt.getMapredLocalDirs(); for (String localDir : localDirs) { checkFileStatus(tt.listStatus(localDir, true, false), localDir); @@ -118,13 +115,13 @@ public class TestCluster { @Test public void testTaskStatus() throws Exception { Configuration conf = new Configuration(cluster.getConf()); - JTProtocol wovenClient = cluster.getMaster().getProxy(); + JTProtocol wovenClient = cluster.getJTClient().getProxy(); FinishTaskControlAction.configureControlActionForJob(conf); SleepJob job = new SleepJob(); job.setConf(conf); conf = job.setupJobConf(1, 0, 100, 100, 100, 100); - JobClient client = cluster.getMaster().getClient(); + JobClient client = cluster.getJTClient().getClient(); RunningJob rJob = client.submitJob(new JobConf(conf)); JobID id = rJob.getID(); @@ -144,8 +141,7 @@ public class TestCluster { String[] taskTrackers = info.getTaskTrackers(); for(String taskTracker : taskTrackers) { TTInfo ttInfo = wovenClient.getTTInfo(taskTracker); - TTClient ttCli = cluster.getSlaves().get( - ttInfo.getStatus().getHost()); + TTClient ttCli = cluster.getTTClient(ttInfo.getStatus().getHost()); TTTaskInfo ttTaskInfo = ttCli.getProxy().getTask(info.getTaskID()); Assert.assertNotNull(ttTaskInfo); FinishTaskControlAction action = new FinishTaskControlAction( Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestControlledJob.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestControlledJob.java?rev=1077263&r1=1077262&r2=1077263&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestControlledJob.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestControlledJob.java Fri Mar 4 03:57:38 2011 @@ -40,13 +40,13 @@ public class TestControlledJob { @Test public void testControlledJob() throws Exception { Configuration conf = new Configuration(cluster.getConf()); - JTProtocol wovenClient = cluster.getMaster().getProxy(); + JTProtocol wovenClient = cluster.getJTClient().getProxy(); FinishTaskControlAction.configureControlActionForJob(conf); SleepJob job = new SleepJob(); job.setConf(conf); conf = job.setupJobConf(1, 0, 100, 100, 100, 100); - JobClient client = cluster.getMaster().getClient(); + JobClient client = cluster.getJTClient().getClient(); RunningJob rJob = client.submitJob(new JobConf(conf)); JobID id = rJob.getID(); @@ -74,7 +74,7 @@ public class TestControlledJob { LOG.info("constructing control action to signal task to finish"); FinishTaskControlAction action = new FinishTaskControlAction( TaskID.downgrade(info.getTaskID())); - for(TTClient cli : cluster.getSlaves().values()) { + for(TTClient cli : cluster.getTTClients()) { cli.getProxy().sendAction(action); } } Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestSortValidate.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestSortValidate.java?rev=1077263&r1=1077262&r2=1077263&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestSortValidate.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestSortValidate.java Fri Mar 4 03:57:38 2011 @@ -64,7 +64,7 @@ public class TestSortValidate { @Before public void setUp() throws java.lang.Exception { cluster.setUp(); - client = cluster.getMaster().getClient(); + client = cluster.getJTClient().getClient(); dfs = client.getFs(); dfs.delete(SORT_INPUT_PATH, true); @@ -97,7 +97,7 @@ public class TestSortValidate { int prevJobsNum = 0; // JTProtocol wovenClient - JTProtocol wovenClient = cluster.getMaster().getProxy(); + JTProtocol wovenClient = cluster.getJTClient().getProxy(); // JobStatus JobStatus[] jobStatus = null; @@ -141,7 +141,7 @@ public class TestSortValidate { jInfo = wovenClient.getJobInfo(id); } - cluster.getMaster().verifyCompletedJob(id); + cluster.getJTClient().verifyCompletedJob(id); } private void runSort(Configuration job, Path sortInput, Path sortOutput) Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskOwner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskOwner.java?rev=1077263&r1=1077262&r2=1077263&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskOwner.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskOwner.java Fri Mar 4 03:57:38 2011 @@ -54,7 +54,7 @@ public class TestTaskOwner { cluster = MRCluster.createCluster(new Configuration()); cluster.setUp(); - FileSystem fs = inDir.getFileSystem(cluster.getMaster().getConf()); + FileSystem fs = inDir.getFileSystem(cluster.getJTClient().getConf()); fs.create(inDir); } @@ -65,7 +65,7 @@ public class TestTaskOwner { // in the cluster and we will authenticate whether matches // with the job that is submitted by the same user. - Configuration conf = cluster.getMaster().getConf(); + Configuration conf = cluster.getJTClient().getConf(); Job job = new Job(conf, "user name check"); job.setJarByClass(UserNamePermission.class); @@ -119,7 +119,7 @@ public class TestTaskOwner { @AfterClass public static void tearDown() throws java.lang.Exception { - FileSystem fs = outDir.getFileSystem(cluster.getMaster().getConf()); + FileSystem fs = outDir.getFileSystem(cluster.getJTClient().getConf()); fs.delete(outDir, true); cluster.tearDown(); } Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java?rev=1077263&r1=1077262&r2=1077263&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java Fri Mar 4 03:57:38 2011 @@ -1,78 +1,118 @@ package org.apache.hadoop.mapreduce.test.system; import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.test.system.AbstractMasterSlaveCluster; +import org.apache.hadoop.test.system.AbstractDaemonClient; +import org.apache.hadoop.test.system.AbstractDaemonCluster; import org.apache.hadoop.test.system.process.ClusterProcessManager; -import org.apache.hadoop.test.system.process.ClusterProcessManagerFactory; +import org.apache.hadoop.test.system.process.HadoopDaemonRemoteCluster; import org.apache.hadoop.test.system.process.RemoteProcess; -import org.apache.hadoop.test.system.process.ClusterProcessManager.ClusterType; /** - * Concrete MasterSlaveCluster representing a Map-Reduce cluster. + * Concrete AbstractDaemonCluster representing a Map-Reduce cluster. * */ -public class MRCluster extends AbstractMasterSlaveCluster { +@SuppressWarnings("unchecked") +public class MRCluster extends AbstractDaemonCluster { private static final Log LOG = LogFactory.getLog(MRCluster.class); + public static final String CLUSTER_PROCESS_MGR_IMPL = + "test.system.mr.clusterprocess.impl.class"; + protected enum Role {JT, TT}; + private MRCluster(Configuration conf, ClusterProcessManager rCluster) throws IOException { super(conf, rCluster); } /** - * Creates an instance of the Map-Reduce cluster.
- * Example usage:
- * - * Configuration conf = new Configuration();
- * conf.set(ClusterProcessManager.IMPL_CLASS, - * org.apache.hadoop.test.system.process.HadoopDaemonRemoteCluster. - * class.getName())
- * conf.set(HadoopDaemonRemoteCluster.CONF_HADOOPHOME, - * "/path");
- * conf.set(HadoopDaemonRemoteCluster.CONF_HADOOPCONFDIR, - * "/path");
- * MRCluster cluster = MRCluster.createCluster(conf); - *
+ * Factory method to create an instance of the Map-Reduce cluster.
* * @param conf * contains all required parameter to create cluster. * @return a cluster instance to be managed. - * @throws IOException * @throws Exception */ public static MRCluster createCluster(Configuration conf) - throws IOException, Exception { - return new MRCluster(conf, ClusterProcessManagerFactory.createInstance( - ClusterType.MAPRED, conf)); + throws Exception { + String implKlass = conf.get(CLUSTER_PROCESS_MGR_IMPL, System + .getProperty(CLUSTER_PROCESS_MGR_IMPL)); + if (implKlass == null || implKlass.isEmpty()) { + implKlass = MRProcessManager.class.getName(); + } + Class klass = (Class) Class + .forName(implKlass); + ClusterProcessManager clusterProcessMgr = klass.newInstance(); + LOG.info("Created ClusterProcessManager as " + implKlass); + clusterProcessMgr.init(conf); + return new MRCluster(conf, clusterProcessMgr); } - @Override - protected JTClient createMaster(RemoteProcess masterDaemon) + protected JTClient createJTClient(RemoteProcess jtDaemon) throws IOException { - return new JTClient(getConf(), masterDaemon); + return new JTClient(getConf(), jtDaemon); } - @Override - protected TTClient createSlave(RemoteProcess slaveDaemon) + protected TTClient createTTClient(RemoteProcess ttDaemon) throws IOException { - return new TTClient(getConf(), slaveDaemon); + return new TTClient(getConf(), ttDaemon); + } + + public JTClient getJTClient() { + Iterator it = getDaemons().get(Role.JT).iterator(); + return (JTClient) it.next(); + } + + public List getTTClients() { + return (List) getDaemons().get(Role.TT); + } + + public TTClient getTTClient(String hostname) { + for (TTClient c : getTTClients()) { + if (c.getHostName().equals(hostname)) { + return c; + } + } + return null; } @Override public void ensureClean() throws IOException { //TODO: ensure that no jobs/tasks are running //restart the cluster if cleanup fails - JTClient jtClient = getMaster(); + JTClient jtClient = getJTClient(); JobInfo[] jobs = jtClient.getProxy().getAllJobInfo(); for(JobInfo job : jobs) { jtClient.getClient().killJob( org.apache.hadoop.mapred.JobID.downgrade(job.getID())); } } + + @Override + protected AbstractDaemonClient createClient( + RemoteProcess process) throws IOException { + if (Role.JT.equals(process.getRole())) { + return createJTClient(process); + } else if (Role.TT.equals(process.getRole())) { + return createTTClient(process); + } else throw new IOException("Role: "+ process.getRole() + " is not " + + "applicable to MRCluster"); + } + + public static class MRProcessManager extends HadoopDaemonRemoteCluster{ + private static final List mrDaemonInfos = + Arrays.asList(new HadoopDaemonInfo[]{ + new HadoopDaemonInfo("jobtracker", Role.JT, "masters"), + new HadoopDaemonInfo("tasktracker", Role.TT, "slaves")}); + public MRProcessManager() { + super(mrDaemonInfos); + } + } } Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonCluster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonCluster.java?rev=1077263&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonCluster.java (added) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/AbstractDaemonCluster.java Fri Mar 4 03:57:38 2011 @@ -0,0 +1,223 @@ +package org.apache.hadoop.test.system; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.test.system.process.ClusterProcessManager; +import org.apache.hadoop.test.system.process.RemoteProcess; + +/** + * Abstract class which represent the cluster having multiple daemons. + */ +@SuppressWarnings("unchecked") +public abstract class AbstractDaemonCluster { + + private static final Log LOG = LogFactory.getLog(AbstractDaemonCluster.class); + + private Configuration conf; + protected ClusterProcessManager clusterManager; + private Map, List> daemons = + new LinkedHashMap, List>(); + + /** + * Constructor to create a cluster client.
+ * + * @param conf + * Configuration to be used while constructing the cluster. + * @param rcluster + * process manger instance to be used for managing the daemons. + * + * @throws IOException + */ + public AbstractDaemonCluster(Configuration conf, + ClusterProcessManager rcluster) throws IOException { + this.conf = conf; + this.clusterManager = rcluster; + createAllClients(); + } + + protected void createAllClients() throws IOException { + for (RemoteProcess p : clusterManager.getAllProcesses()) { + List dms = daemons.get(p.getRole()); + if (dms == null) { + dms = new ArrayList(); + daemons.put(p.getRole(), dms); + } + dms.add(createClient(p)); + } + } + + /** + * Method to create the daemon client.
+ * + * @param remoteprocess + * to manage the daemon. + * @return instance of the daemon client + * + * @throws IOException + */ + protected abstract AbstractDaemonClient + createClient(RemoteProcess process) throws IOException; + + /** + * Get the global cluster configuration which was used to create the + * cluster.
+ * + * @return global configuration of the cluster. + */ + public Configuration getConf() { + return conf; + } + + /** + * + + /** + * Return the client handle of all the Daemons.
+ * + * @return map of role to daemon clients' list. + */ + public Map, List> getDaemons() { + return daemons; + } + + /** + * Checks if the cluster is ready for testing.
+ * Algorithm for checking is as follows :
+ *
    + *
  • Wait for Daemon to come up
  • + *
  • Check if daemon is ready
  • + *
  • If one of the daemon is not ready, return false
  • + *
+ * + * @return true if whole cluster is ready. + * + * @throws IOException + */ + public boolean isReady() throws IOException { + for (List set : daemons.values()) { + for (AbstractDaemonClient daemon : set) { + waitForDaemon(daemon); + if (!daemon.isReady()) { + return false; + } + } + } + return true; + } + + protected void waitForDaemon(AbstractDaemonClient d) { + while(true) { + try { + LOG.info("Waiting for daemon in host to come up : " + d.getHostName()); + d.connect(); + break; + } catch (IOException e) { + try { + Thread.sleep(10000); + } catch (InterruptedException ie) { + } + } + } + } + + /** + * Starts the cluster daemons. + * @throws IOException + */ + public void start() throws IOException { + clusterManager.start(); + } + + /** + * Stops the cluster daemons. + * @throws IOException + */ + public void stop() throws IOException { + clusterManager.stop(); + } + + /** + * Connect to daemon RPC ports. + * @throws IOException + */ + public void connect() throws IOException { + for (List set : daemons.values()) { + for (AbstractDaemonClient daemon : set) { + daemon.connect(); + } + } + } + + /** + * Disconnect to daemon RPC ports. + * @throws IOException + */ + public void disconnect() throws IOException { + for (List set : daemons.values()) { + for (AbstractDaemonClient daemon : set) { + daemon.disconnect(); + } + } + } + + /** + * Ping all the daemons of the cluster. + * @throws IOException + */ + public void ping() throws IOException { + for (List set : daemons.values()) { + for (AbstractDaemonClient daemon : set) { + LOG.info("Daemon is : " + daemon.getHostName() + " pinging...."); + daemon.ping(); + } + } + } + + /** + * Connect to the cluster and ensure that it is clean to run tests. + * @throws Exception + */ + public void setUp() throws Exception { + while (!isReady()) { + Thread.sleep(1000); + } + connect(); + ping(); + clearAllControlActions(); + ensureClean(); + } + + public void clearAllControlActions() throws IOException { + for (List set : daemons.values()) { + for (AbstractDaemonClient daemon : set) { + LOG.info("Daemon is : " + daemon.getHostName() + " pinging...."); + daemon.getProxy().clearActions(); + } + } + } + + /** + * Ensure that the cluster is clean to run tests. + * @throws IOException + */ + public void ensureClean() throws IOException { + } + + /** + * Ensure that cluster is clean. Disconnect from the RPC ports of the daemons. + * @throws IOException + */ + public void tearDown() throws IOException { + ensureClean(); + clearAllControlActions(); + disconnect(); + } +} + Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/ClusterProcessManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/ClusterProcessManager.java?rev=1077263&r1=1077262&r2=1077263&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/ClusterProcessManager.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/ClusterProcessManager.java Fri Mar 4 03:57:38 2011 @@ -1,68 +1,48 @@ package org.apache.hadoop.test.system.process; import java.io.IOException; -import java.util.Map; +import java.util.List; +import java.util.Set; import org.apache.hadoop.conf.Configuration; /** - * Interface to manage the remote processes in the master-slave cluster. + * Interface to manage the remote processes in the cluster. */ public interface ClusterProcessManager { /** - * The configuration key to specify the concrete implementation of the - * {@link ClusterProcessManager} to be used by - * {@link ClusterProcessManagerFactory}. - */ - String IMPL_CLASS = "test.system.clusterprocessmanager.impl.class"; - - /** - * Enumeration used to specify the types of the clusters which are supported - * by the concrete implementations of {@link ClusterProcessManager}. - */ - public enum ClusterType { - MAPRED, HDFS - } - - /** - * Initialization method to set cluster type and also pass the configuration - * object which is required by the ClusterProcessManager to manage the - * cluster.
+ * Initialization method to pass the configuration object which is required + * by the ClusterProcessManager to manage the cluster.
* Configuration object should typically contain all the parameters which are * required by the implementations.
* - * @param t type of the cluster to be managed. * @param conf configuration containing values of the specific keys which * are required by the implementation of the cluster process manger. * - * @throws Exception when initialization fails. + * @throws IOException when initialization fails. */ - void init(ClusterType t, Configuration conf) throws Exception; + void init(Configuration conf) throws IOException; /** - * Getter for master daemon process for managing the master daemon.
- * - * @return master daemon process. + * Get the list of RemoteProcess handles of all the remote processes. */ - RemoteProcess getMaster(); + List getAllProcesses(); /** - * Getter for slave daemon process for managing the slaves.
- * - * @return map of slave hosts to slave daemon process. + * Get all the roles this cluster's daemon processes have. */ - Map getSlaves(); + Set> getRoles(); /** - * Method to start the cluster including all master and slaves.
+ * Method to start all the remote daemons.
* * @throws IOException if startup procedure fails. */ void start() throws IOException; /** - * Method to shutdown all the master and slaves.
+ * Method to shutdown all the remote daemons.
* * @throws IOException if shutdown procedure fails. */ Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/HadoopDaemonRemoteCluster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/HadoopDaemonRemoteCluster.java?rev=1077263&r1=1077262&r2=1077263&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/HadoopDaemonRemoteCluster.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/HadoopDaemonRemoteCluster.java Fri Mar 4 03:57:38 2011 @@ -7,7 +7,9 @@ import java.io.IOException; import java.net.InetAddress; import java.util.ArrayList; import java.util.HashMap; -import java.util.Map; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -26,11 +28,12 @@ import org.apache.hadoop.util.Shell.Shel * Following will be the format which the final command execution would look : *
* - * ssh master-host 'hadoop-home/bin/hadoop-daemon.sh --script scriptName - * --config HADOOP_CONF_DIR (start|stop) masterCommand' + * ssh host 'hadoop-home/bin/hadoop-daemon.sh --script scriptName + * --config HADOOP_CONF_DIR (start|stop) command' * */ -public class HadoopDaemonRemoteCluster implements ClusterProcessManager { +public abstract class HadoopDaemonRemoteCluster + implements ClusterProcessManager { private static final Log LOG = LogFactory .getLog(HadoopDaemonRemoteCluster.class.getName()); @@ -53,62 +56,46 @@ public class HadoopDaemonRemoteCluster i private String hadoopHome; private String hadoopConfDir; private String deployed_hadoopConfDir; - private String masterCommand; - private String slaveCommand; + private final Set> roles; - private RemoteProcess master; - private Map slaves; + private final List daemonInfos; + private List processes; + + public static class HadoopDaemonInfo { + public final String cmd; + public final Enum role; + public final String hostFile; + public HadoopDaemonInfo(String cmd, Enum role, String hostFile) { + super(); + this.cmd = cmd; + this.role = role; + this.hostFile = hostFile; + } + } + + public HadoopDaemonRemoteCluster(List daemonInfos) { + this.daemonInfos = daemonInfos; + this.roles = new HashSet>(); + for (HadoopDaemonInfo info : daemonInfos) { + this.roles.add(info.role); + } + } @Override - public void init(ClusterType t, Configuration conf) throws Exception { - /* - * Initialization strategy of the HadoopDaemonRemoteCluster is three staged - * process: 1. Populate script names based on the type of passed cluster. 2. - * Populate the required directories. 3. Populate the master and slaves. - */ - populateScriptNames(t); + public void init(Configuration conf) throws IOException { populateDirectories(conf); - this.slaves = new HashMap(); + this.processes = new ArrayList(); populateDaemons(deployed_hadoopConfDir); } - /** - * Method to populate the required master and slave commands which are used to - * manage the cluster.
- * - * @param t - * type of cluster to be initialized. - * - * @throws UnsupportedOperationException - * if the passed cluster type is not MAPRED or HDFS - */ - private void populateScriptNames(ClusterType t) { - switch (t) { - case MAPRED: - masterCommand = "jobtracker"; - slaveCommand = "tasktracker"; - if (LOG.isDebugEnabled()) { - LOG.debug("Created mapred hadoop daemon remote cluster manager with " - + "scriptName: mapred, masterCommand: jobtracker, " - + "slaveCommand: tasktracker"); - } - break; - case HDFS: - masterCommand = "namenode"; - slaveCommand = "datanode"; - if (LOG.isDebugEnabled()) { - LOG.debug("Created hdfs hadoop daemon remote cluster manager with " - + "scriptName: hdfs, masterCommand: namenode, " - + "slaveCommand: datanode"); - } - break; - default: - LOG.error("Cluster type :" + t - + "is not supported currently by HadoopDaemonRemoteCluster"); - throw new UnsupportedOperationException( - "The specified cluster type is not supported by the " + - "HadoopDaemonRemoteCluster"); - } + @Override + public List getAllProcesses() { + return processes; + } + + @Override + public Set> getRoles() { + return roles; } /** @@ -116,14 +103,14 @@ public class HadoopDaemonRemoteCluster i * * @param conf * Configuration object containing values for - * TEST_SYSTEM_HADOOPHOME_CONF_KEY and - * TEST_SYSTEM_HADOOPCONFDIR_CONF_KEY + * CONF_HADOOPHOME and + * CONF_HADOOPCONFDIR * * @throws IllegalArgumentException * if the configuration or system property set does not contain * values for the required keys. */ - private void populateDirectories(Configuration conf) { + protected void populateDirectories(Configuration conf) { hadoopHome = conf.get(CONF_HADOOPHOME, System .getProperty(CONF_HADOOPHOME)); hadoopConfDir = conf.get(CONF_HADOOPCONFDIR, System @@ -147,69 +134,57 @@ public class HadoopDaemonRemoteCluster i } @Override - public RemoteProcess getMaster() { - return master; - } - - @Override - public Map getSlaves() { - return slaves; - } - - @Override public void start() throws IOException { - // start master first. - master.start(); - for (RemoteProcess slave : slaves.values()) { - slave.start(); + for (RemoteProcess process : processes) { + process.start(); } } @Override public void stop() throws IOException { - master.kill(); - for (RemoteProcess slave : slaves.values()) { - slave.kill(); + for (RemoteProcess process : processes) { + process.kill(); } } - private void populateDaemons(String confLocation) throws IOException { - File mastersFile = new File(confLocation, "masters"); - File slavesFile = new File(confLocation, "slaves"); + protected void populateDaemon(String confLocation, + HadoopDaemonInfo info) throws IOException { + File hostFile = new File(confLocation, info.hostFile); BufferedReader reader = null; + reader = new BufferedReader(new FileReader(hostFile)); + String host = null; try { - reader = new BufferedReader(new FileReader(mastersFile)); - String masterHost = null; - masterHost = reader.readLine(); - if (masterHost != null && !masterHost.trim().isEmpty()) { - master = new ScriptDaemon(masterCommand, masterHost); + boolean foundAtLeastOne = false; + while ((host = reader.readLine()) != null) { + if (host.trim().isEmpty()) { + throw new IllegalArgumentException( + "Hostname could not be found in file " + info.hostFile); + } + InetAddress addr = InetAddress.getByName(host); + RemoteProcess process = new ScriptDaemon(info.cmd, + addr.getCanonicalHostName(), info.role); + processes.add(process); + foundAtLeastOne = true; + } + if (!foundAtLeastOne) { + throw new IllegalArgumentException("Alteast one hostname " + + "is required to be present in file - " + info.hostFile); } } finally { try { reader.close(); } catch (Exception e) { - LOG.error("Can't read masters file from " + confLocation); - } - - } - try { - reader = new BufferedReader(new FileReader(slavesFile)); - String slaveHost = null; - while ((slaveHost = reader.readLine()) != null) { - InetAddress addr = InetAddress.getByName(slaveHost); - RemoteProcess slave = new ScriptDaemon(slaveCommand, - addr.getCanonicalHostName()); - slaves.put(addr.getCanonicalHostName(), slave); - } - } finally { - try { - reader.close(); - } catch (Exception e) { - LOG.error("Can't read slaves file from " + confLocation); + LOG.warn("Could not close reader"); } } } + protected void populateDaemons(String confLocation) throws IOException { + for (HadoopDaemonInfo info : daemonInfos) { + populateDaemon(confLocation, info); + } + } + /** * The core daemon class which actually implements the remote process * management of actual daemon processes in the cluster. @@ -222,10 +197,12 @@ public class HadoopDaemonRemoteCluster i private static final String SCRIPT_NAME = "hadoop-daemon.sh"; private final String daemonName; private final String hostName; + private final Enum role; - public ScriptDaemon(String daemonName, String hostName) { + public ScriptDaemon(String daemonName, String hostName, Enum role) { this.daemonName = daemonName; this.hostName = hostName; + this.role = role; } @Override @@ -272,6 +249,10 @@ public class HadoopDaemonRemoteCluster i public void start() throws IOException { buildCommandExecutor(START_COMMAND).execute(); } - } + @Override + public Enum getRole() { + return role; + } + } } Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/RemoteProcess.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/RemoteProcess.java?rev=1077263&r1=1077262&r2=1077263&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/RemoteProcess.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/test/system/process/RemoteProcess.java Fri Mar 4 03:57:38 2011 @@ -26,4 +26,11 @@ public interface RemoteProcess { * @throws IOException if shutdown fails. */ void kill() throws IOException; + + /** + * Get the role of the Daemon in the cluster. + * + * @return Enum + */ + Enum getRole(); } \ No newline at end of file