Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 62197 invoked from network); 21 Jun 2010 19:03:45 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 21 Jun 2010 19:03:45 -0000 Received: (qmail 28888 invoked by uid 500); 21 Jun 2010 19:03:45 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 28764 invoked by uid 500); 21 Jun 2010 19:03:45 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 28747 invoked by uid 99); 21 Jun 2010 19:03:45 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Jun 2010 19:03:45 +0000 X-ASF-Spam-Status: No, hits=-1339.6 required=10.0 tests=ALL_TRUSTED,AWL X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Jun 2010 19:03:37 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 0845823888DD; Mon, 21 Jun 2010 19:02:52 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r956666 [1/4] - in /hadoop/mapreduce/trunk: ./ ivy/ src/test/aop/build/ src/test/mapred/org/apache/hadoop/mapred/ src/test/mapred/testjar/ src/test/system/ src/test/system/aop/ src/test/system/aop/org/ src/test/system/aop/org/apache/ src/te... Date: Mon, 21 Jun 2010 19:02:51 -0000 To: mapreduce-commits@hadoop.apache.org From: cos@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100621190252.0845823888DD@eris.apache.org> Author: cos Date: Mon Jun 21 19:02:49 2010 New Revision: 956666 URL: http://svn.apache.org/viewvc?rev=956666&view=rev Log: MAPREDUCE-1774. Large-scale Automated Framework. Contributed by Sharad Agarwal, Sreekanth, Ramakrishnan, Konstantin Boudnik, at all. Added: hadoop/mapreduce/trunk/ivy/hadoop-mapred-instrumented-template.xml hadoop/mapreduce/trunk/ivy/hadoop-mapred-instrumented-test-template.xml hadoop/mapreduce/trunk/src/test/mapred/testjar/JobKillCommitter.java hadoop/mapreduce/trunk/src/test/mapred/testjar/UserNamePermission.java hadoop/mapreduce/trunk/src/test/system/ hadoop/mapreduce/trunk/src/test/system/aop/ hadoop/mapreduce/trunk/src/test/system/aop/org/ hadoop/mapreduce/trunk/src/test/system/aop/org/apache/ hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/ hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/ hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/JTProtocolAspect.aj hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/JobClientAspect.aj hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/JobInProgressAspect.aj hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/JobTrackerAspect.aj hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/MapReducePolicyProviderAspect.aj hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/TaskAspect.aj hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/TaskTrackerAspect.aj hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapreduce/ hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapreduce/ClusterAspect.aj hadoop/mapreduce/trunk/src/test/system/conf/ hadoop/mapreduce/trunk/src/test/system/conf/system-test-mapred.xml hadoop/mapreduce/trunk/src/test/system/java/ hadoop/mapreduce/trunk/src/test/system/java/org/ hadoop/mapreduce/trunk/src/test/system/java/org/apache/ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/JobInfoImpl.java hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/TTInfoImpl.java hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/TTTaskInfoImpl.java hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapred/TaskInfoImpl.java hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/ hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/FinishTaskControlAction.java hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTClient.java hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JTProtocol.java hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/JobInfo.java hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRCluster.java hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/MRDaemonClient.java hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTClient.java hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTInfo.java hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTProtocol.java hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TTTaskInfo.java hadoop/mapreduce/trunk/src/test/system/java/org/apache/hadoop/mapreduce/test/system/TaskInfo.java hadoop/mapreduce/trunk/src/test/system/test/ hadoop/mapreduce/trunk/src/test/system/test/org/ hadoop/mapreduce/trunk/src/test/system/test/org/apache/ hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/ hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/ hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestCluster.java hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestControlledJob.java hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestDistributedCacheModifiedFile.java hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestDistributedCachePrivateFile.java hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestDistributedCacheUnModifiedFile.java hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestFileOwner.java hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestJobKill.java hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestPushConfig.java hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestSortValidate.java hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestTaskKilling.java hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestTaskOwner.java Modified: hadoop/mapreduce/trunk/CHANGES.txt hadoop/mapreduce/trunk/build.xml hadoop/mapreduce/trunk/ivy.xml hadoop/mapreduce/trunk/ivy/libraries.properties hadoop/mapreduce/trunk/src/test/aop/build/aop.xml hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java Modified: hadoop/mapreduce/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=956666&r1=956665&r2=956666&view=diff ============================================================================== --- hadoop/mapreduce/trunk/CHANGES.txt (original) +++ hadoop/mapreduce/trunk/CHANGES.txt Mon Jun 21 19:02:49 2010 @@ -6,6 +6,9 @@ Trunk (unreleased changes) NEW FEATURES + MAPREDUCE-1774. Large-scale Automated Framework (Sharad Agarwal, Sreekanth + Ramakrishnan, Konstantin Boudnik, at all via cos) + MAPREDUCE-1804. Stress-test tool for HDFS introduced in HDFS-708. (Joshua Harlow via shv) Modified: hadoop/mapreduce/trunk/build.xml URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/build.xml?rev=956666&r1=956665&r2=956666&view=diff ============================================================================== --- hadoop/mapreduce/trunk/build.xml (original) +++ hadoop/mapreduce/trunk/build.xml Mon Jun 21 19:02:49 2010 @@ -30,6 +30,7 @@ + @@ -209,7 +210,7 @@ - + @@ -240,7 +241,7 @@ - + @@ -510,19 +511,10 @@ - - - - + + + + + + + + + + + + + + @@ -585,6 +599,16 @@ + + + + + + + + + + @@ -597,27 +621,24 @@ - - - - - - - + + + + - - - - + + + + + todir="@{test.dir}/extraconf" /> + todir="@{test.dir}/extraconf" /> - + - - - + + + - + @@ -652,36 +673,39 @@ - - + + - - + + - - - + + + + - - + - - + + - - + @@ -690,15 +714,24 @@ - + - + - + @@ -1202,6 +1235,45 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -1255,7 +1327,8 @@ - + @@ -1278,7 +1351,8 @@ - + @@ -1310,12 +1384,20 @@ + + + + + + @@ -1344,6 +1426,14 @@ + + + + + + + + @@ -1387,11 +1477,13 @@ - + + + @@ -1735,6 +1827,11 @@ log="${ivyresolvelog}"/> + + + + + + + + + @@ -1885,6 +1990,7 @@ description="Make hadoop-fi.jar"> Modified: hadoop/mapreduce/trunk/ivy.xml URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/ivy.xml?rev=956666&r1=956665&r2=956666&view=diff ============================================================================== --- hadoop/mapreduce/trunk/ivy.xml (original) +++ hadoop/mapreduce/trunk/ivy.xml Mon Jun 21 19:02:49 2010 @@ -39,6 +39,7 @@ + @@ -60,6 +61,10 @@ rev="${hadoop-common.version}" conf="common->default"/> + + + + + + + 4.0.0 + org.apache.hadoop + hadoop-mapred-instrumented + jar + @version + + + org.apache.hadoop + hadoop-common + 0.22.0-dev-SNAPSHOT + + + Added: hadoop/mapreduce/trunk/ivy/hadoop-mapred-instrumented-test-template.xml URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/ivy/hadoop-mapred-instrumented-test-template.xml?rev=956666&view=auto ============================================================================== --- hadoop/mapreduce/trunk/ivy/hadoop-mapred-instrumented-test-template.xml (added) +++ hadoop/mapreduce/trunk/ivy/hadoop-mapred-instrumented-test-template.xml Mon Jun 21 19:02:49 2010 @@ -0,0 +1,34 @@ + + + + + + 4.0.0 + org.apache.hadoop + hadoop-mapred-test-instrumented + jar + @version + + + org.apache.hadoop + hadoop-mapred + @version + + + Modified: hadoop/mapreduce/trunk/ivy/libraries.properties URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/ivy/libraries.properties?rev=956666&r1=956665&r2=956666&view=diff ============================================================================== --- hadoop/mapreduce/trunk/ivy/libraries.properties (original) +++ hadoop/mapreduce/trunk/ivy/libraries.properties Mon Jun 21 19:02:49 2010 @@ -18,6 +18,7 @@ apacheant.version=1.7.1 ant-task.version=2.0.10 #Aspectj depedency for Fault injection +#This property has to be updated synchronously with aop.xml aspectj.version=1.6.5 avro.version=1.3.0 Modified: hadoop/mapreduce/trunk/src/test/aop/build/aop.xml URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/aop/build/aop.xml?rev=956666&r1=956665&r2=956666&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/aop/build/aop.xml (original) +++ hadoop/mapreduce/trunk/src/test/aop/build/aop.xml Mon Jun 21 19:02:49 2010 @@ -14,13 +14,42 @@ See the License for the specific language governing permissions and limitations under the License. --> - + + + + + + + + + - + + + + + + + + + + + + + + @@ -39,21 +68,27 @@ - + + + + deprecation="${javac.deprecation}" + fork="true" + maxmem="256m"> + @@ -69,15 +104,133 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -99,11 +252,12 @@ + - + @@ -129,4 +283,78 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java?rev=956666&r1=956665&r2=956666&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java Mon Jun 21 19:02:49 2010 @@ -36,6 +36,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.namenode.NameNode; @@ -744,4 +745,38 @@ public class UtilsForTests { JobTracker jt = new JobTracker(); return jt; } + + /** + * This creates a file in the dfs + * @param dfs FileSystem Local File System where file needs to be picked + * @param URIPATH Path dfs path where file needs to be copied + * @param permission FsPermission File permission + * @return returns the DataOutputStream + */ + public static DataOutputStream + createTmpFileDFS(FileSystem dfs, Path URIPATH, + FsPermission permission, String input) throws Exception { + //Creating the path with the file + DataOutputStream file = + FileSystem.create(dfs, URIPATH, permission); + file.writeBytes(input); + file.close(); + return file; + } + + /** + * This formats the long tasktracker name to just the FQDN + * @param taskTrackerLong String The long format of the tasktracker string + * @return String The FQDN of the tasktracker + * @throws Exception + */ + public static String getFQDNofTT (String taskTrackerLong) throws Exception { + //Getting the exact FQDN of the tasktracker from the tasktracker string. + String[] firstSplit = taskTrackerLong.split("_"); + String tmpOutput = firstSplit[1]; + String[] secondSplit = tmpOutput.split(":"); + String tmpTaskTracker = secondSplit[0]; + return tmpTaskTracker; + } + } Added: hadoop/mapreduce/trunk/src/test/mapred/testjar/JobKillCommitter.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/testjar/JobKillCommitter.java?rev=956666&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/testjar/JobKillCommitter.java (added) +++ hadoop/mapreduce/trunk/src/test/mapred/testjar/JobKillCommitter.java Mon Jun 21 19:02:49 2010 @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package testjar; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileOutputCommitter; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; + +public class JobKillCommitter { + /** + * The class provides a overrided implementation of output committer + * set up method, which causes the job to fail during set up. + */ + public static class CommitterWithFailSetup extends FileOutputCommitter { + @Override + public void setupJob(JobContext context) throws IOException { + throw new IOException(); + } + } + + /** + * The class provides a dummy implementation of outputcommitter + * which does nothing + */ + public static class CommitterWithNoError extends FileOutputCommitter { + @Override + public void setupJob(JobContext context) throws IOException { + } + + @Override + public void commitJob(JobContext context) throws IOException { + } + } + + /** + * The class provides a overrided implementation of commitJob which + * causes the clean up method to fail. + */ + public static class CommitterWithFailCleanup extends FileOutputCommitter { + @Override + public void commitJob(JobContext context) throws IOException { + throw new IOException(); + } + } + + /** + * The class is used provides a dummy implementation for mapper method which + * does nothing. + */ + public static class MapperPass extends Mapper { + public void map(LongWritable key, Text value, Context context) + throws IOException, InterruptedException { + } + } + /** + * The class provides a sleep implementation for mapper method. + */ + public static class MapperPassSleep extends + Mapper { + public void map(LongWritable key, Text value, Context context) + throws IOException, InterruptedException { + Thread.sleep(10000); + } + } + + /** + * The class provides a way for the mapper function to fail by + * intentionally throwing an IOException + */ + public static class MapperFail extends Mapper { + public void map(LongWritable key, Text value, Context context) + throws IOException, InterruptedException { + throw new IOException(); + } + } + + /** + * The class provides a way for the reduce function to fail by + * intentionally throwing an IOException + */ + public static class ReducerFail extends Reducer { + public void reduce(Text key, Iterator values, Context context) + throws IOException, InterruptedException { + throw new IOException(); + } + } + + /** + * The class provides a empty implementation of reducer method that + * does nothing + */ + public static class ReducerPass extends Reducer { + public void reduce(Text key, Iterator values, Context context) + throws IOException, InterruptedException { + } + } +} Added: hadoop/mapreduce/trunk/src/test/mapred/testjar/UserNamePermission.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/testjar/UserNamePermission.java?rev=956666&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/testjar/UserNamePermission.java (added) +++ hadoop/mapreduce/trunk/src/test/mapred/testjar/UserNamePermission.java Mon Jun 21 19:02:49 2010 @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package testjar; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; + +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + +public class UserNamePermission +{ + + private static final Log LOG = LogFactory.getLog(UserNamePermission.class); + //This mapper will read the user name and pass in to the reducer + public static class UserNameMapper extends Mapper + { + Text key1 = new Text("UserName"); + public void map(LongWritable key, Text value, Context context) + throws IOException,InterruptedException { + Text val = new Text(System.getProperty("user.name").toString()); + context.write(key1, val); + } + } + + //The reducer is responsible for writing the user name to the file + //which will be validated by the testcase + public static class UserNameReducer extends Reducer + { + public void reduce(Text key, Iterator values, + Context context) throws IOException,InterruptedException { + + LOG.info("The key "+key); + if(values.hasNext()) + { + Text val = values.next(); + LOG.info("The value "+val); + + context.write(key,new Text(System.getProperty("user.name"))); + } + + } + } + + public static void main(String [] args) throws Exception + { + Path outDir = new Path("output"); + Configuration conf = new Configuration(); + Job job = new Job(conf, "user name check"); + + + job.setJarByClass(UserNamePermission.class); + job.setMapperClass(UserNamePermission.UserNameMapper.class); + job.setCombinerClass(UserNamePermission.UserNameReducer.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(Text.class); + job.setReducerClass(UserNamePermission.UserNameReducer.class); + job.setNumReduceTasks(1); + + job.setInputFormatClass(TextInputFormat.class); + TextInputFormat.addInputPath(job, new Path("input")); + FileOutputFormat.setOutputPath(job, outDir); + + System.exit(job.waitForCompletion(true) ? 0 : 1); + } + +} + Added: hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/JTProtocolAspect.aj URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/JTProtocolAspect.aj?rev=956666&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/JTProtocolAspect.aj (added) +++ hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/JTProtocolAspect.aj Mon Jun 21 19:02:49 2010 @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import org.apache.hadoop.mapreduce.protocol.ClientProtocol; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.test.system.JTProtocol; +import org.apache.hadoop.mapreduce.test.system.JobInfo; +import org.apache.hadoop.mapreduce.test.system.TTInfo; +import org.apache.hadoop.mapreduce.test.system.TaskInfo; + +/** + * Aspect which injects the basic protocol functionality which is to be + * implemented by all the services which implement {@link ClientProtocol} + * + * Aspect also injects default implementation for the {@link JTProtocol} + */ + +public aspect JTProtocolAspect { + + // Make the ClientProtocl extend the JTprotocol + declare parents : ClientProtocol extends JTProtocol; + + /* + * Start of default implementation of the methods in JTProtocol + */ + + public Configuration JTProtocol.getDaemonConf() throws IOException { + return null; + } + + public JobInfo JTProtocol.getJobInfo(JobID jobID) throws IOException { + return null; + } + + public TaskInfo JTProtocol.getTaskInfo(TaskID taskID) throws IOException { + return null; + } + + public TTInfo JTProtocol.getTTInfo(String trackerName) throws IOException { + return null; + } + + public JobInfo[] JTProtocol.getAllJobInfo() throws IOException { + return null; + } + + public TaskInfo[] JTProtocol.getTaskInfo(JobID jobID) throws IOException { + return null; + } + + public TTInfo[] JTProtocol.getAllTTInfo() throws IOException { + return null; + } + + public boolean JTProtocol.isJobRetired(JobID jobID) throws IOException { + return false; + } + + public String JTProtocol.getJobHistoryLocationForRetiredJob(JobID jobID) throws IOException { + return ""; + } +} Added: hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/JobClientAspect.aj URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/JobClientAspect.aj?rev=956666&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/JobClientAspect.aj (added) +++ hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/JobClientAspect.aj Mon Jun 21 19:02:49 2010 @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.protocol.ClientProtocol; + +public privileged aspect JobClientAspect { + + public ClientProtocol JobClient.getProtocol() { + return cluster.getClientProtocol(); + } + + public void JobClient.killJob(JobID id) throws IOException,InterruptedException { + cluster.getClientProtocol().killJob( + org.apache.hadoop.mapred.JobID.downgrade(id)); + } +} Added: hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/JobInProgressAspect.aj URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/JobInProgressAspect.aj?rev=956666&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/JobInProgressAspect.aj (added) +++ hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/JobInProgressAspect.aj Mon Jun 21 19:02:49 2010 @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.jobhistory.JobHistory; +import org.apache.hadoop.mapreduce.test.system.JobInfo; + +/** + * Aspect to add a utility method in the JobInProgress for easing up the + * construction of the JobInfo object. + */ +privileged aspect JobInProgressAspect { + + /** + * Returns a read only view of the JobInProgress object which is used by the + * client. + * + * @return JobInfo of the current JobInProgress object + */ + public JobInfo JobInProgress.getJobInfo() { + String historyLoc = getHistoryPath(); + boolean isHistoryFileCopied = + this.status.getHistoryFile() == null ? false : true; + if (tasksInited.get()) { + return new JobInfoImpl( + this.getJobID(), this.isSetupLaunched(), this.isSetupFinished(), this + .isCleanupLaunched(), this.runningMaps(), this.runningReduces(), + this.pendingMaps(), this.pendingReduces(), this.finishedMaps(), this + .finishedReduces(), this.getStatus(), historyLoc, this + .getBlackListedTrackers(), false, this.numMapTasks, + this.numReduceTasks, isHistoryFileCopied); + } else { + return new JobInfoImpl( + this.getJobID(), false, false, false, 0, 0, this.pendingMaps(), this + .pendingReduces(), this.finishedMaps(), this.finishedReduces(), + this.getStatus(), historyLoc, this.getBlackListedTrackers(), this + .isComplete(), this.numMapTasks, this.numReduceTasks, false); + } + } + + private String JobInProgress.getHistoryPath() { + String historyLoc = ""; + if (this.isComplete()) { + historyLoc = this.getStatus().getHistoryFile(); + } else { + Path jobHistoryDirectory = this.jobHistory.getJobHistoryLocation(); + Path historypath = + JobHistory.getJobHistoryFile( + jobHistoryDirectory, this.getJobID(), this.profile.getUser()); + historyLoc = historypath.toString(); + } + return historyLoc; + } + +} Added: hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/JobTrackerAspect.aj URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/JobTrackerAspect.aj?rev=956666&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/JobTrackerAspect.aj (added) +++ hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/JobTrackerAspect.aj Mon Jun 21 19:02:49 2010 @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker; +import org.apache.hadoop.mapreduce.test.system.JTProtocol; +import org.apache.hadoop.mapreduce.test.system.JobInfo; +import org.apache.hadoop.mapreduce.test.system.TTInfo; +import org.apache.hadoop.mapreduce.test.system.TaskInfo; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.system.DaemonProtocol; + +/** + * Aspect class which injects the code for {@link JobTracker} class. + * + */ +public privileged aspect JobTrackerAspect { + + + public Configuration JobTracker.getDaemonConf() throws IOException { + return conf; + } + /** + * Method to get the read only view of the job and its associated information. + * + * @param jobID + * id of the job for which information is required. + * @return JobInfo of the job requested + * @throws IOException + */ + public JobInfo JobTracker.getJobInfo(JobID jobID) throws IOException { + JobInProgress jip = jobs.get(org.apache.hadoop.mapred.JobID + .downgrade(jobID)); + if (jip == null) { + LOG.warn("No job present for : " + jobID); + return null; + } + JobInfo info; + synchronized (jip) { + info = jip.getJobInfo(); + } + return info; + } + + /** + * Method to get the read only view of the task and its associated + * information. + * + * @param taskID + * @return + * @throws IOException + */ + public TaskInfo JobTracker.getTaskInfo(TaskID taskID) throws IOException { + TaskInProgress tip = getTip(org.apache.hadoop.mapred.TaskID + .downgrade(taskID)); + + if (tip == null) { + LOG.warn("No task present for : " + taskID); + return null; + } + return getTaskInfo(tip); + } + + public TTInfo JobTracker.getTTInfo(String trackerName) throws IOException { + org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker tt = taskTrackers + .get(trackerName); + if (tt == null) { + LOG.warn("No task tracker with name : " + trackerName + " found"); + return null; + } + TaskTrackerStatus status = tt.getStatus(); + TTInfo info = new TTInfoImpl(status.trackerName, status); + return info; + } + + // XXX Below two method don't reuse getJobInfo and getTaskInfo as there is a + // possibility that retire job can run and remove the job from JT memory + // during + // processing of the RPC call. + public JobInfo[] JobTracker.getAllJobInfo() throws IOException { + List infoList = new ArrayList(); + synchronized (jobs) { + for (JobInProgress jip : jobs.values()) { + JobInfo info = jip.getJobInfo(); + infoList.add(info); + } + } + return (JobInfo[]) infoList.toArray(new JobInfo[infoList.size()]); + } + + public TaskInfo[] JobTracker.getTaskInfo(JobID jobID) throws IOException { + JobInProgress jip = jobs.get(org.apache.hadoop.mapred.JobID + .downgrade(jobID)); + if (jip == null) { + LOG.warn("Unable to find job : " + jobID); + return null; + } + List infoList = new ArrayList(); + synchronized (jip) { + for (TaskInProgress tip : jip.setup) { + infoList.add(getTaskInfo(tip)); + } + for (TaskInProgress tip : jip.maps) { + infoList.add(getTaskInfo(tip)); + } + for (TaskInProgress tip : jip.reduces) { + infoList.add(getTaskInfo(tip)); + } + for (TaskInProgress tip : jip.cleanup) { + infoList.add(getTaskInfo(tip)); + } + } + return (TaskInfo[]) infoList.toArray(new TaskInfo[infoList.size()]); + } + + public TTInfo[] JobTracker.getAllTTInfo() throws IOException { + List infoList = new ArrayList(); + synchronized (taskTrackers) { + for (TaskTracker tt : taskTrackers.values()) { + TaskTrackerStatus status = tt.getStatus(); + TTInfo info = new TTInfoImpl(status.trackerName, status); + infoList.add(info); + } + } + return (TTInfo[]) infoList.toArray(new TTInfo[infoList.size()]); + } + + public boolean JobTracker.isJobRetired(JobID id) throws IOException { + return retireJobs.get( + org.apache.hadoop.mapred.JobID.downgrade(id))!=null?true:false; + } + + public String JobTracker.getJobHistoryLocationForRetiredJob( + JobID id) throws IOException { + String historyFile = this.getJobStatus(id).getHistoryFile(); + if(historyFile == null) { + throw new IOException("The retired job information for the job : " + + id +" is not found"); + } else { + return historyFile; + } + } + pointcut getVersionAspect(String protocol, long clientVersion) : + execution(public long JobTracker.getProtocolVersion(String , + long) throws IOException) && args(protocol, clientVersion); + + long around(String protocol, long clientVersion) : + getVersionAspect(protocol, clientVersion) { + if (protocol.equals(DaemonProtocol.class.getName())) { + return DaemonProtocol.versionID; + } else if (protocol.equals(JTProtocol.class.getName())) { + return JTProtocol.versionID; + } else { + return proceed(protocol, clientVersion); + } + } + + /** + * Point cut which monitors for the start of the jobtracker and sets the right + * value if the jobtracker is started. + */ + pointcut jtConstructorPointCut() : + call(JobTracker.new(..)); + + after() returning (JobTracker tracker): jtConstructorPointCut() { + try { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + tracker.setUser(ugi.getShortUserName()); + } catch (IOException e) { + tracker.LOG.warn("Unable to get the user information for the " + + "Jobtracker"); + } + tracker.setReady(true); + } + + private TaskInfo JobTracker.getTaskInfo(TaskInProgress tip) { + TaskStatus[] status = tip.getTaskStatuses(); + if (status == null) { + if (tip.isMapTask()) { + status = new MapTaskStatus[]{}; + } + else { + status = new ReduceTaskStatus[]{}; + } + } + String[] trackers = + (String[]) (tip.getActiveTasks().values()).toArray(new String[tip + .getActiveTasks().values().size()]); + TaskInfo info = + new TaskInfoImpl(tip.getTIPId(), tip.getProgress(), tip + .getActiveTasks().size(), tip.numKilledTasks(), tip + .numTaskFailures(), status, (tip.isJobSetupTask() || tip + .isJobCleanupTask()), trackers); + return info; + } +} Added: hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/MapReducePolicyProviderAspect.aj URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/MapReducePolicyProviderAspect.aj?rev=956666&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/MapReducePolicyProviderAspect.aj (added) +++ hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/MapReducePolicyProviderAspect.aj Mon Jun 21 19:02:49 2010 @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapreduce.test.system.TTProtocol; +import org.apache.hadoop.security.authorize.Service; +import org.apache.hadoop.test.system.DaemonProtocol; + +/** + * This aspect adds two MR specific Herriot protocols tp the list of + * 'authorized' Herriot protocols. Protocol descriptors i.e. + * 'security.tt.protocol.acl' have to be added to hadoop-policy.xml + * if present + */ +public privileged aspect MapReducePolicyProviderAspect { + private static final Log LOG = LogFactory + .getLog(MapReducePolicyProviderAspect.class); + ArrayList herriotMRServices = null; + + pointcut updateMRServices() : + execution (public Service[] MapReducePolicyProvider.getServices()); + + Service[] around() : updateMRServices () { + herriotMRServices = new ArrayList(); + for (Service s : MapReducePolicyProvider.mapReduceServices) { + LOG.debug("Copying configured protocol to " + + s.getProtocol().getCanonicalName()); + herriotMRServices.add(s); + } + herriotMRServices.add(new Service("security.daemon.protocol.acl", + DaemonProtocol.class)); + herriotMRServices.add(new Service("security.tt.protocol.acl", + TTProtocol.class)); + final Service[] retArray = herriotMRServices + .toArray(new Service[herriotMRServices.size()]); + LOG.debug("Number of configured protocols to return: " + retArray.length); + return retArray; + } +} Added: hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/TaskAspect.aj URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/TaskAspect.aj?rev=956666&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/TaskAspect.aj (added) +++ hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/TaskAspect.aj Mon Jun 21 19:02:49 2010 @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.mapred.Task.TaskReporter; +import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction; +import org.apache.hadoop.test.system.ControlAction; +import org.apache.hadoop.test.system.DaemonProtocol; +import org.apache.hadoop.mapreduce.test.system.TTProtocol; + +public privileged aspect TaskAspect { + + private static final Log LOG = LogFactory.getLog(TaskAspect.class); + + private Object waitObject = new Object(); + private AtomicBoolean isWaitingForSignal = new AtomicBoolean(false); + + private DaemonProtocol daemonProxy; + + pointcut taskDoneIntercept(Task task) : execution( + public void Task.done(..)) && target(task); + + void around(Task task) : taskDoneIntercept(task) { + if(task.isJobCleanupTask() || task.isJobSetupTask() || task.isTaskCleanupTask()) { + proceed(task); + return; + } + Configuration conf = task.getConf(); + boolean controlEnabled = FinishTaskControlAction.isControlActionEnabled(conf); + if(controlEnabled) { + LOG.info("Task control enabled, waiting till client sends signal to " + + "complete"); + try { + synchronized (waitObject) { + isWaitingForSignal.set(true); + waitObject.wait(); + } + } catch (InterruptedException e) { + } + } + proceed(task); + return; + } + + pointcut taskStatusUpdate(TaskReporter reporter, TaskAttemptID id) : + call(public boolean TaskUmbilicalProtocol.ping(TaskAttemptID)) + && this(reporter) && args(id); + + after(TaskReporter reporter, TaskAttemptID id) throws IOException : + taskStatusUpdate(reporter, id) { + synchronized (waitObject) { + if(isWaitingForSignal.get()) { + ControlAction[] actions = daemonProxy.getActions( + id.getTaskID()); + if(actions.length == 0) { + return; + } + boolean shouldProceed = false; + for(ControlAction action : actions) { + if (action instanceof FinishTaskControlAction) { + LOG.info("Recv : Control task action to finish task id: " + + action.getTarget()); + shouldProceed = true; + daemonProxy.removeAction(action); + LOG.info("Removed the control action from TaskTracker"); + break; + } + } + if(shouldProceed) { + LOG.info("Notifying the task to completion"); + waitObject.notify(); + } + } + } + } + + + pointcut rpcInterceptor(Class k, long version,InetSocketAddress addr, + Configuration conf) : call( + public static * RPC.getProxy(Class, long ,InetSocketAddress, + Configuration)) && args(k, version,addr, conf) && + within(org.apache.hadoop.mapred.Child) ; + + after(Class k, long version, InetSocketAddress addr, Configuration conf) + throws IOException : rpcInterceptor(k, version, addr, conf) { + daemonProxy = + (TTProtocol) RPC.getProxy( + TTProtocol.class, TTProtocol.versionID, addr, conf); + } + +} Added: hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/TaskTrackerAspect.aj URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/TaskTrackerAspect.aj?rev=956666&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/TaskTrackerAspect.aj (added) +++ hadoop/mapreduce/trunk/src/test/system/aop/org/apache/hadoop/mapred/TaskTrackerAspect.aj Mon Jun 21 19:02:49 2010 @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.util.List; +import java.util.ArrayList; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.test.system.TTProtocol; +import org.apache.hadoop.mapreduce.test.system.TTTaskInfo; +import org.apache.hadoop.mapred.TTTaskInfoImpl.MapTTTaskInfo; +import org.apache.hadoop.mapred.TTTaskInfoImpl.ReduceTTTaskInfo; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.system.DaemonProtocol; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.util.Shell.ShellCommandExecutor; +import org.apache.hadoop.mapred.TaskTracker.TaskInProgress; +import org.apache.hadoop.mapreduce.TaskAttemptID; + +public privileged aspect TaskTrackerAspect { + + declare parents : TaskTracker implements TTProtocol; + + // Add a last sent status field to the Tasktracker class. + TaskTrackerStatus TaskTracker.lastSentStatus = null; + public static String TaskTracker.TASKJARDIR = TaskTracker.JARSDIR; + + public synchronized TaskTrackerStatus TaskTracker.getStatus() + throws IOException { + return lastSentStatus; + } + + public Configuration TaskTracker.getDaemonConf() throws IOException { + return fConf; + } + + public TTTaskInfo[] TaskTracker.getTasks() throws IOException { + List infoList = new ArrayList(); + synchronized (tasks) { + for (TaskInProgress tip : tasks.values()) { + TTTaskInfo info = getTTTaskInfo(tip); + infoList.add(info); + } + } + return (TTTaskInfo[]) infoList.toArray(new TTTaskInfo[infoList.size()]); + } + + public TTTaskInfo TaskTracker.getTask(org.apache.hadoop.mapreduce.TaskID id) + throws IOException { + TaskID old = org.apache.hadoop.mapred.TaskID.downgrade(id); + synchronized (tasks) { + for(TaskAttemptID ta : tasks.keySet()) { + if(old.equals(ta.getTaskID())) { + return getTTTaskInfo(tasks.get(ta)); + } + } + } + return null; + } + + private TTTaskInfo TaskTracker.getTTTaskInfo(TaskInProgress tip) { + TTTaskInfo info; + if (tip.task.isMapTask()) { + info = new MapTTTaskInfo(tip.slotTaken, tip.wasKilled, + (MapTaskStatus) tip.getStatus(), tip.getJobConf(), tip.getTask() + .getUser(), tip.getTask().isTaskCleanupTask(), getPid(tip.getTask().getTaskID())); + } else { + info = new ReduceTTTaskInfo(tip.slotTaken, tip.wasKilled, + (ReduceTaskStatus) tip.getStatus(), tip.getJobConf(), tip.getTask() + .getUser(), tip.getTask().isTaskCleanupTask(),getPid(tip.getTask().getTaskID())); + } + return info; + } + + before(TaskTrackerStatus newStatus, TaskTracker tracker) : + set(TaskTrackerStatus TaskTracker.status) + && args(newStatus) && this(tracker) { + if (newStatus == null) { + tracker.lastSentStatus = tracker.status; + } + } + + pointcut ttConstructorPointCut(JobConf conf) : + call(TaskTracker.new(JobConf)) + && args(conf); + + after(JobConf conf) returning (TaskTracker tracker): + ttConstructorPointCut(conf) { + try { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + tracker.setUser(ugi.getShortUserName()); + } catch (IOException e) { + tracker.LOG.warn("Unable to get the user information for the " + + "Jobtracker"); + } + tracker.setReady(true); + } + + pointcut getVersionAspect(String protocol, long clientVersion) : + execution(public long TaskTracker.getProtocolVersion(String , + long) throws IOException) && args(protocol, clientVersion); + + long around(String protocol, long clientVersion) : + getVersionAspect(protocol, clientVersion) { + if(protocol.equals(DaemonProtocol.class.getName())) { + return DaemonProtocol.versionID; + } else if(protocol.equals(TTProtocol.class.getName())) { + return TTProtocol.versionID; + } else { + return proceed(protocol, clientVersion); + } + } + + public boolean TaskTracker.isProcessTreeAlive(String pid) throws IOException { + // Command to be executed is as follows : + // ps -o pid,ppid,sid,command -e | grep -v ps | grep -v grep | grep + // "$pid" + String checkerCommand = + getDaemonConf().get( + "test.system.processgroup_checker_command", + "ps -o pid,ppid,sid,command -e " + + "| grep -v ps | grep -v grep | grep \"$"); + String[] command = + new String[] { "bash", "-c", checkerCommand + pid + "\"" }; + ShellCommandExecutor shexec = new ShellCommandExecutor(command); + try { + shexec.execute(); + } catch (Shell.ExitCodeException e) { + TaskTracker.LOG + .info("The process tree grep threw a exitcode exception pointing " + + "to process tree not being alive."); + return false; + } + TaskTracker.LOG.info("The task grep command is : " + + shexec.toString() + " the output from command is : " + + shexec.getOutput()); + return true; + } +}