hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r784771 [1/2] - in /hadoop/core/branches/HADOOP-3628-2: ./ src/c++/task-controller/ src/core/org/apache/hadoop/fs/kfs/ src/core/org/apache/hadoop/util/ src/docs/src/documentation/content/xdocs/ src/examples/org/apache/hadoop/examples/ src/m...
Date Mon, 15 Jun 2009 13:23:46 GMT
Author: stevel
Date: Mon Jun 15 13:23:44 2009
New Revision: 784771

URL: http://svn.apache.org/viewvc?rev=784771&view=rev
Log:
HADOOP-3628 merge with trunk

Added:
    hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapreduce/lib/db/   (props changed)
      - copied from r784718, hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/db/
    hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapreduce/lib/db/DBConfiguration.java
      - copied unchanged from r784718, hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/db/DBConfiguration.java
    hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java
      - copied unchanged from r784718, hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/db/DBInputFormat.java
    hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapreduce/lib/db/DBOutputFormat.java
      - copied unchanged from r784718, hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/db/DBOutputFormat.java
    hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapreduce/lib/db/DBWritable.java
      - copied unchanged from r784718, hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/db/DBWritable.java
    hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapreduce/lib/db/package.html
      - copied unchanged from r784718, hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/db/package.html
    hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapreduce/lib/jobcontrol/   (props changed)
      - copied from r784718, hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/jobcontrol/
    hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapreduce/lib/jobcontrol/ControlledJob.java
      - copied unchanged from r784718, hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/jobcontrol/ControlledJob.java
    hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java
      - copied unchanged from r784718, hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java
    hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapreduce/lib/jobcontrol/package.html
      - copied unchanged from r784718, hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/jobcontrol/package.html
    hadoop/core/branches/HADOOP-3628-2/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcessesWithLinuxTaskController.java
      - copied unchanged from r784718, hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcessesWithLinuxTaskController.java
    hadoop/core/branches/HADOOP-3628-2/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
      - copied unchanged from r784718, hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
    hadoop/core/branches/HADOOP-3628-2/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/   (props changed)
      - copied from r784718, hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/
    hadoop/core/branches/HADOOP-3628-2/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestDBJob.java
      - copied unchanged from r784718, hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestDBJob.java
    hadoop/core/branches/HADOOP-3628-2/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestDBOutputFormat.java
      - copied unchanged from r784718, hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/db/TestDBOutputFormat.java
    hadoop/core/branches/HADOOP-3628-2/src/test/mapred/org/apache/hadoop/mapreduce/lib/jobcontrol/   (props changed)
      - copied from r784718, hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/jobcontrol/
    hadoop/core/branches/HADOOP-3628-2/src/test/mapred/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControl.java
      - copied unchanged from r784718, hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/jobcontrol/TestMapReduceJobControl.java
Removed:
    hadoop/core/branches/HADOOP-3628-2/src/test/mapred/org/apache/hadoop/mapred/lib/db/TestDBJob.java
Modified:
    hadoop/core/branches/HADOOP-3628-2/   (props changed)
    hadoop/core/branches/HADOOP-3628-2/CHANGES.txt   (contents, props changed)
    hadoop/core/branches/HADOOP-3628-2/ivybuild.xml
    hadoop/core/branches/HADOOP-3628-2/src/c++/task-controller/configuration.c
    hadoop/core/branches/HADOOP-3628-2/src/c++/task-controller/configuration.h.in
    hadoop/core/branches/HADOOP-3628-2/src/c++/task-controller/main.c
    hadoop/core/branches/HADOOP-3628-2/src/c++/task-controller/task-controller.c
    hadoop/core/branches/HADOOP-3628-2/src/c++/task-controller/task-controller.h
    hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/IFSImpl.java
    hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/KFSImpl.java
    hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/KFSInputStream.java
    hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/KFSOutputStream.java
    hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/KosmosFileSystem.java
    hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/util/ProcessTree.java
    hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/util/Service.java
    hadoop/core/branches/HADOOP-3628-2/src/docs/src/documentation/content/xdocs/cluster_setup.xml
    hadoop/core/branches/HADOOP-3628-2/src/examples/org/apache/hadoop/examples/DBCountPageView.java
    hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
    hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JvmManager.java
    hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
    hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskController.java
    hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/jobcontrol/Job.java
    hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/jobcontrol/JobControl.java
    hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/lib/db/DBConfiguration.java
    hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/lib/db/DBInputFormat.java
    hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java
    hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/lib/db/DBWritable.java
    hadoop/core/branches/HADOOP-3628-2/src/test/core/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java
    hadoop/core/branches/HADOOP-3628-2/src/test/core/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java
    hadoop/core/branches/HADOOP-3628-2/src/test/findbugsExcludeFile.xml
    hadoop/core/branches/HADOOP-3628-2/src/test/hdfs/org/apache/hadoop/hdfs/TestModTime.java
    hadoop/core/branches/HADOOP-3628-2/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
    hadoop/core/branches/HADOOP-3628-2/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java
    hadoop/core/branches/HADOOP-3628-2/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRChildTask.java
    hadoop/core/branches/HADOOP-3628-2/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java

Propchange: hadoop/core/branches/HADOOP-3628-2/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jun 15 13:23:44 2009
@@ -1,2 +1,2 @@
 /hadoop/core/branches/branch-0.19:713112
-/hadoop/core/trunk:779103-783712
+/hadoop/core/trunk:779103-784731

Modified: hadoop/core/branches/HADOOP-3628-2/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/CHANGES.txt?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/CHANGES.txt (original)
+++ hadoop/core/branches/HADOOP-3628-2/CHANGES.txt Mon Jun 15 13:23:44 2009
@@ -56,6 +56,18 @@
     HADOOP-4359. Support for data access authorization checking on Datanodes.
     (Kan Zhang via rangadi)
 
+    HADOOP-5690. Change org.apache.hadoop.examples.DBCountPageView to use 
+    new mapreduce api. (Amareshwari Sriramadasu via sharad)
+
+    HADOOP-5694. Change org.apache.hadoop.examples.dancing to use new 
+    mapreduce api. (Amareshwari Sriramadasu via sharad)
+
+    HADOOP-5696. Change org.apache.hadoop.examples.Sort to use new 
+    mapreduce api. (Amareshwari Sriramadasu via sharad)
+
+    HADOOP-5698. Change org.apache.hadoop.examples.MultiFileWordCount to 
+    use new mapreduce api. (Amareshwari Sriramadasu via sharad)
+
   NEW FEATURES
 
     HADOOP-4268. Change fsck to use ClientProtocol methods so that the
@@ -414,12 +426,6 @@
     Option.withArgPattern. (Giridharan Kesavan and Sharad Agarwal via 
     sharad)
 
-    HADOOP-5698. Change org.apache.hadoop.examples.MultiFileWordCount to 
-    use new mapreduce api. (Amareshwari Sriramadasu via sharad)
-
-    HADOOP-5696. Change org.apache.hadoop.examples.Sort to use new 
-    mapreduce api. (Amareshwari Sriramadasu via sharad)
-
     HADOOP-5784. Makes the number of heartbeats that should arrive a second
     at the JobTracker configurable. (Amareshwari Sriramadasu via ddas)
 
@@ -435,8 +441,8 @@
     HADOOP-5961. DataNode process understand generic hadoop command line
     options (like -Ddfs.property=value). (Raghu Angadi)
 
-    HADOOP-5694. Change org.apache.hadoop.examples.dancing to use new 
-    mapreduce api. (Amareshwari Sriramadasu via sharad)
+    HADOOP-5938. Change org.apache.hadoop.mapred.jobcontrol to use new
+    api. (Amareshwari Sriramadasu via sharad)
 
   OPTIMIZATIONS
 
@@ -803,6 +809,17 @@
     HADOOP-5981. Fix a bug in HADOOP-2838 in parsing mapred.child.env.
     (Amar Kamat via sharad)
 
+    HADOOP-5420. Fix LinuxTaskController to kill tasks using the process
+    groups they are launched with.
+    (Sreekanth Ramakrishnan via yhemanth)
+
+    HADOOP-6031. Remove @author tags from Java source files.  (Ravi Phulari
+    via szetszwo)
+
+    HADOOP-5980. Fix LinuxTaskController so tasks get passed 
+    LD_LIBRARY_PATH and other environment variables.
+    (Sreekanth Ramakrishnan via yhemanth)
+
 Release 0.20.1 - Unreleased
 
   INCOMPATIBLE CHANGES
@@ -925,6 +942,12 @@
     causing TestQueueCapacities to fail.
     (Sreekanth Ramakrishnan via yhemanth)
 
+    HADOOP-5921. Fixes a problem in the JobTracker where it sometimes never used
+    to come up due to a system file creation on JobTracker's system-dir failing. 
+    This problem would sometimes show up only when the FS for the system-dir 
+    (usually HDFS) is started at nearly the same time as the JobTracker. 
+    (Amar Kamat via ddas)
+
 Release 0.20.0 - 2009-04-15
 
   INCOMPATIBLE CHANGES

Propchange: hadoop/core/branches/HADOOP-3628-2/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jun 15 13:23:44 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.18/CHANGES.txt:727226
 /hadoop/core/branches/branch-0.19/CHANGES.txt:713112
-/hadoop/core/trunk/CHANGES.txt:779103-783712
+/hadoop/core/trunk/CHANGES.txt:779103-784731

Modified: hadoop/core/branches/HADOOP-3628-2/ivybuild.xml
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/ivybuild.xml?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/ivybuild.xml (original)
+++ hadoop/core/branches/HADOOP-3628-2/ivybuild.xml Mon Jun 15 13:23:44 2009
@@ -264,52 +264,55 @@
   <target name="svn-merge" depends="svn-init"
     description="merge in the trunk"  >
     <svn>
-      <arg line="merge ${trunk}"/>
+      <arg value="merge"/>
+      <arg value="${trunk}"/>
+      <arg value="--accept"/>
+      <arg value="postpone"/>
     </svn>
   </target>
   
   <target name="svn-diff" depends="svn-init"
     description="diff the local code against the branch"  >
     <svn>
-      <arg line="diff"/>
+      <arg value="diff"/>
     </svn>
   </target>
 
   <target name="svn-resolved" depends="svn-init"
     description="mark the tree as resolved"  >
     <svn>
-      <arg line="resolve"/>
+      <arg value="resolve"/>
     </svn>
   </target>
 
   <!--
   svn diff \
-  https://svn.apache.org/repos/asf/hadoop/core/branches/HADOOP-3628-2\
-  https://svn.apache.org/repos/asf/hadoop/core/trunk
+  https://svn.apache.org/repos/asf/hadoop/core/trunk \
+  https://svn.apache.org/repos/asf/hadoop/core/branches/HADOOP-3628-2
   -->
   <target name="svn-diff-trunk" depends="svn-init"
-    description="diff against trunk"  >
+      description="diff against trunk"  >
     <svn>
       <arg value="diff" />
-      <arg value="https://svn.apache.org/repos/asf/hadoop/core/branches/HADOOP-3628-2"/>
       <arg value="https://svn.apache.org/repos/asf/hadoop/core/trunk"/>
+      <arg value="https://svn.apache.org/repos/asf/hadoop/core/branches/HADOOP-3628-2"/>
     </svn>
   </target>
-    
-  
+
+
   <target name="svn-create-changelist" depends="svn-init"
-    description="Create a changelist of everything we want in the big patch"  >
+      description="Create a changelist of everything we want in the big patch"  >
     <property name="core/hadoop" value="src/core/org/apache/hadoop" />
-    <property name="hdfs/server" 
+    <property name="hdfs/server"
         value="src/hdfs/org/apache/hadoop/hdfs/server" />
     <property name="hadoop/mapred"
-        value="src/mapred/org/apache/hadoop/mapred/" />
-    <property name="test/core" 
+        value="src/mapred/org/apache/hadoop/mapred" />
+    <property name="test/core"
         value="src/test/org/apache/hadoop" />
-    <property name="test/hdfs/" 
+    <property name="test/hdfs"
         value="src/test/hdfs/org/apache/hadoop/hdfs" />
-    <property name="test/mapred" 
-        value="src/test/mapred/org/apache/hadoop/mapred/" />
+    <property name="test/mapred"
+        value="src/test/mapred/org/apache/hadoop/mapred" />
 
     <svn>
       <arg value="changelist"/>
@@ -321,13 +324,13 @@
       <arg value="${core/hadoop}/security/authorize/ConfiguredPolicy.java" />
       -->
       <arg value="${core/hadoop}/http/HttpServer.java" />
-      <arg value="${core/hadoop}/http/io/ThrowableWritable.java" />
+      <arg value="${core/hadoop}/io/ThrowableWritable.java" />
       <arg value="${core/hadoop}/util/Service.java" />
       <arg value="${core/hadoop}/util/MockService.java" />
-      
+
       <arg value="${hdfs/server}/datanode/DataNode.java" />
       <arg value="${hdfs/server}/datanode/FSDataset.java" />
-      <arg value="${hdfs/server}/namenode/BackupNode" />
+      <arg value="${hdfs/server}/namenode/BackupNode.java" />
       <arg value="${hdfs/server}/namenode/FSNamesystem.java" />
       <arg value="${hdfs/server}/namenode/NameNode.java" />
       <arg value="${hdfs/server}/namenode/PendingReplicationBlocks.java" />
@@ -342,20 +345,7 @@
       <arg value="${test/mapred}/TestTaskTrackerLifecycle.java" />
     </svn>
   </target>
-  
-  
-  <!--
-  svn diff \
-  https://svn.apache.org/repos/asf/hadoop/core/trunk \
-  https://svn.apache.org/repos/asf/hadoop/core/branches/HADOOP-3628-2
-  -->
-  <target name="svn-diff-trunk" depends="svn-init"
-    description="diff against trunk"  >
-    <svn>
-      <arg value="diff" />
-      <arg value="https://svn.apache.org/repos/asf/hadoop/core/trunk"/>
-      <arg value="https://svn.apache.org/repos/asf/hadoop/core/branches/HADOOP-3628-2"/>
-    </svn>
-  </target>                                                            
-  
+
+
+
 </project>
\ No newline at end of file

Modified: hadoop/core/branches/HADOOP-3628-2/src/c++/task-controller/configuration.c
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/c%2B%2B/task-controller/configuration.c?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/c++/task-controller/configuration.c (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/c++/task-controller/configuration.c Mon Jun 15 13:23:44 2009
@@ -202,3 +202,36 @@
   return NULL;
 }
 
+const char ** get_values(char * key) {
+  const char ** toPass = NULL;
+  const char * value = get_value(key);
+  char *tempTok = NULL;
+  char *tempstr = NULL;
+  int size = 0;
+  int len;
+  //first allocate any array of 10
+  if(value != NULL) {
+    toPass = (const char **) malloc(sizeof(char *) * MAX_SIZE);
+    tempTok = strtok_r((char *)value, ",", &tempstr);
+    if (tempTok != NULL) {
+      while (1) {
+        toPass[size++] = tempTok;
+        tempTok = strtok_r(NULL, ",", &tempstr);
+        if(tempTok == NULL){
+          break;
+        }
+        if((size % MAX_SIZE) == 0) {
+          toPass = (const char **) realloc(toPass,(sizeof(char *) *
+              (MAX_SIZE * ((size/MAX_SIZE) +1))));
+        }
+      }
+    } else {
+      toPass[size] = (char *)value;
+    }
+  }
+  if(size > 0) {
+    toPass[size] = NULL;
+  }
+  return toPass;
+}
+

Modified: hadoop/core/branches/HADOOP-3628-2/src/c++/task-controller/configuration.h.in
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/c%2B%2B/task-controller/configuration.h.in?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/c++/task-controller/configuration.h.in (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/c++/task-controller/configuration.h.in Mon Jun 15 13:23:44 2009
@@ -57,3 +57,6 @@
 //method to free allocated configuration
 void free_configurations();
 
+//function to return array of values pointing to the key. Values are
+//comma seperated strings.
+const char ** get_values(char* key);

Modified: hadoop/core/branches/HADOOP-3628-2/src/c++/task-controller/main.c
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/c%2B%2B/task-controller/main.c?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/c++/task-controller/main.c (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/c++/task-controller/main.c Mon Jun 15 13:23:44 2009
@@ -25,15 +25,16 @@
   const char * task_id = NULL;
   const char * tt_root = NULL;
   int exit_code = 0;
+  const char * task_pid = NULL;
   const char* const short_options = "l:";
   const struct option long_options[] = { { "log", 1, NULL, 'l' }, { NULL, 0,
       NULL, 0 } };
 
   const char* log_file = NULL;
 
-  // when we support additional commands without ttroot, this check
-  // may become command specific.
-  if (argc < 6) {
+  //Minimum number of arguments required to run the task-controller
+  //command-name user command tt-root
+  if (argc < 3) {
     display_usage(stderr);
     return INVALID_ARGUMENT_NUMBER;
   }
@@ -44,7 +45,6 @@
   strncpy(hadoop_conf_dir,argv[0],(strlen(argv[0]) - strlen(EXEC_PATTERN)));
   hadoop_conf_dir[(strlen(argv[0]) - strlen(EXEC_PATTERN))] = '\0';
 #endif
-
   do {
     next_option = getopt_long(argc, argv, short_options, long_options, NULL);
     switch (next_option) {
@@ -88,24 +88,25 @@
   }
   optind = optind + 1;
   command = atoi(argv[optind++]);
-  job_id = argv[optind++];
-  task_id = argv[optind++];
-
 #ifdef DEBUG
   fprintf(LOGFILE, "main : command provided %d\n",command);
   fprintf(LOGFILE, "main : user is %s\n", user_detail->pw_name);
-  fprintf(LOGFILE, "main : job id %s \n", job_id);
-  fprintf(LOGFILE, "main : task id %s \n", task_id);
 #endif
   switch (command) {
-  case RUN_TASK:
-    tt_root = argv[optind];
+  case LAUNCH_TASK_JVM:
+    tt_root = argv[optind++];
+    job_id = argv[optind++];
+    task_id = argv[optind++];
     exit_code
         = run_task_as_user(user_detail->pw_name, job_id, task_id, tt_root);
     break;
-  case KILL_TASK:
-    tt_root = argv[optind];
-    exit_code = kill_user_task(user_detail->pw_name, job_id, task_id, tt_root);
+  case TERMINATE_TASK_JVM:
+    task_pid = argv[optind++];
+    exit_code = kill_user_task(user_detail->pw_name, task_pid, SIGTERM);
+    break;
+  case KILL_TASK_JVM:
+    task_pid = argv[optind++];
+    exit_code = kill_user_task(user_detail->pw_name, task_pid, SIGKILL);
     break;
   default:
     exit_code = INVALID_COMMAND_PROVIDED;

Modified: hadoop/core/branches/HADOOP-3628-2/src/c++/task-controller/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/c%2B%2B/task-controller/task-controller.c?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/c++/task-controller/task-controller.c (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/c++/task-controller/task-controller.c Mon Jun 15 13:23:44 2009
@@ -23,9 +23,6 @@
 //LOGFILE
 FILE *LOGFILE;
 
-//hadoop temp dir root which is configured in secure configuration
-const char *mapred_local_dir;
-
 //placeholder for global cleanup operations
 void cleanup() {
   free_configurations();
@@ -36,10 +33,14 @@
   if (get_user_details(user) < 0) {
     return -1;
   }
-#ifdef DEBUG
-  fprintf(LOGFILE,"change_user : setting user as %s ", user_detail->pw_name);
-#endif
+
+  if(initgroups(user_detail->pw_name, user_detail->pw_gid) != 0) {
+	  cleanup();
+	  return SETUID_OPER_FAILED;
+  }
+
   errno = 0;
+
   setgid(user_detail->pw_gid);
   if (errno != 0) {
     fprintf(LOGFILE, "unable to setgid : %s\n", strerror(errno));
@@ -70,90 +71,61 @@
   return 0;
 }
 
-//Function to set the hadoop.temp.dir key from configuration.
-//would return -1 if the configuration is not proper.
-
-int get_mapred_local_dir() {
-
-  if (mapred_local_dir == NULL) {
-    mapred_local_dir = get_value(TT_SYS_DIR_KEY);
-  }
-
-  //after the call it should not be null
-  if (mapred_local_dir == NULL) {
-    return -1;
-  } else {
-    return 0;
-  }
-
-}
 // function to check if the passed tt_root is present in hadoop.tmp.dir
 int check_tt_root(const char *tt_root) {
-  char *token;
+  char ** mapred_local_dir;
   int found = -1;
 
   if (tt_root == NULL) {
     return -1;
   }
 
-  if (mapred_local_dir == NULL) {
-    if (get_mapred_local_dir() < 0) {
-      return -1;
-    }
-  }
+  mapred_local_dir = (char **)get_values(TT_SYS_DIR_KEY);
 
-  token = strtok((char *) mapred_local_dir, ",");
-  if (token == NULL && mapred_local_dir != NULL) {
-    token = (char *)mapred_local_dir;
+  if (mapred_local_dir == NULL) {
+    return -1;
   }
 
-  while (1) {
-    if (strcmp(tt_root, token) == 0) {
+  while(*mapred_local_dir != NULL) {
+    if(strcmp(*mapred_local_dir,tt_root) == 0) {
       found = 0;
       break;
     }
-    token = strtok(NULL, ",");
-    if (token == NULL) {
-      break;
-    }
   }
-
+  free(mapred_local_dir);
   return found;
-
 }
 
-/*
- *d function which would return .pid file path which is used while running
- * and killing of the tasks by the user.
- *
- * check TT_SYS_DIR for pattern
+/**
+ * Function to check if the constructed path and absolute
+ * path resolve to one and same.
  */
-void get_pid_path(const char * jobid, const char * taskid, const char *tt_root,
-    char ** pid_path) {
-
-  int str_len = strlen(TT_SYS_DIR) + strlen(jobid) + strlen(taskid) + strlen(
-      tt_root);
-  *pid_path = NULL;
 
-  if (mapred_local_dir == NULL) {
-    if (get_mapred_local_dir() < 0) {
-      return;
-    }
+int check_path(char *path) {
+  char * resolved_path = (char *) canonicalize_file_name(path);
+  if(resolved_path == NULL) {
+    return ERROR_RESOLVING_FILE_PATH;
+  }
+  if(strcmp(resolved_path, path) !=0) {
+    free(resolved_path);
+    return RELATIVE_PATH_COMPONENTS_IN_FILE_PATH;
   }
-
-  *pid_path = (char *) malloc(sizeof(char) * (str_len + 1));
-
-  if (*pid_path == NULL) {
-    fprintf(LOGFILE, "unable to allocate memory for pid path\n");
-    return;
+  free(resolved_path);
+  return 0;
+}
+/**
+ * Function to check if a user actually owns the file.
+ */
+int check_owner(uid_t uid, char *path) {
+  struct stat filestat;
+  if(stat(path, &filestat)!=0) {
+    return UNABLE_TO_STAT_FILE;
+  }
+  //check owner.
+  if(uid != filestat.st_uid){
+    return FILE_NOT_OWNED_BY_TASKTRACKER;
   }
-  memset(*pid_path,'\0',str_len+1);
-  snprintf(*pid_path, str_len, TT_SYS_DIR, tt_root, jobid, taskid);
-#ifdef DEBUG
-  fprintf(LOGFILE, "get_pid_path : pid path = %s\n", *pid_path);
-  fflush(LOGFILE);
-#endif
-
+  return 0;
 }
 
 /*
@@ -163,19 +135,19 @@
  */
 void get_task_file_path(const char * jobid, const char * taskid,
     const char * tt_root, char **task_script_path) {
+  const char ** mapred_local_dir = get_values(TT_SYS_DIR_KEY);
   *task_script_path = NULL;
   int str_len = strlen(TT_LOCAL_TASK_SCRIPT_PATTERN) + strlen(jobid) + (strlen(
       taskid)) + strlen(tt_root);
 
   if (mapred_local_dir == NULL) {
-    if (get_mapred_local_dir() < 0) {
-      return;
-    }
+    return;
   }
 
   *task_script_path = (char *) malloc(sizeof(char) * (str_len + 1));
   if (*task_script_path == NULL) {
     fprintf(LOGFILE, "Unable to allocate memory for task_script_path \n");
+    free(mapred_local_dir);
     return;
   }
 
@@ -186,13 +158,13 @@
   fprintf(LOGFILE, "get_task_file_path : task script path = %s\n", *task_script_path);
   fflush(LOGFILE);
 #endif
-
+  free(mapred_local_dir);
 }
 
 //end of private functions
 void display_usage(FILE *stream) {
   fprintf(stream,
-      "Usage: task-controller [-l logile] user command command-args\n");
+      "Usage: task-controller [-l logfile] user command command-args\n");
 }
 
 //function used to populate and user_details structure.
@@ -212,28 +184,20 @@
  *Function used to launch a task as the provided user.
  * First the function checks if the tt_root passed is found in
  * hadoop.temp.dir
- *
- *Then gets the path to which the task has to write its pid from
- *get_pid_path.
- *
- * THen writes its pid into the file.
- *
- * Then changes the permission of the pid file into 777
- *
- * Then uses get_task_file_path to fetch the task script file path.
- *
+ * Uses get_task_file_path to fetch the task script file path.
  * Does an execlp on the same in order to replace the current image with
  * task image.
- *
  */
 
 int run_task_as_user(const char * user, const char *jobid, const char *taskid,
     const char *tt_root) {
   char *task_script_path = NULL;
-  char *pid_path = NULL;
-  FILE *file_handle = NULL;
   int exit_code = 0;
-  int i = 0;
+  uid_t uid = getuid();
+
+  if(jobid == NULL || taskid == NULL) {
+    return INVALID_ARGUMENT_NUMBER;
+  }
 
 #ifdef DEBUG
   fprintf(LOGFILE,"run_task_as_user : Job id : %s \n", jobid);
@@ -241,7 +205,8 @@
   fprintf(LOGFILE,"run_task_as_user : tt_root : %s \n", tt_root);
   fflush(LOGFILE);
 #endif
-
+  //Check tt_root before switching the user, as reading configuration
+  //file requires privileged access.
   if (check_tt_root(tt_root) < 0) {
     fprintf(LOGFILE, "invalid tt root passed %s\n", tt_root);
     cleanup();
@@ -257,44 +222,21 @@
     return SETUID_OPER_FAILED;
   }
 
-  get_pid_path(jobid, taskid, tt_root, &pid_path);
-
-  if (pid_path == NULL) {
+  get_task_file_path(jobid, taskid, tt_root, &task_script_path);
+  if (task_script_path == NULL) {
     cleanup();
-    return INVALID_PID_PATH;
+    return INVALID_TASK_SCRIPT_PATH;
   }
-
   errno = 0;
-  file_handle = fopen(pid_path, "w");
-
-  if (file_handle == NULL) {
-    exit_code = UNABLE_TO_OPEN_PID_FILE_WRITE_MODE;
+  exit_code = check_path(task_script_path);
+  if(exit_code != 0) {
     goto cleanup;
   }
-
   errno = 0;
-  if (fprintf(file_handle, "%d\n", getpid()) < 0) {
-    exit_code = UNABLE_TO_WRITE_TO_PID_FILE;
+  exit_code = check_owner(uid, task_script_path);
+  if(exit_code != 0) {
     goto cleanup;
   }
-
-  fflush(file_handle);
-  fclose(file_handle);
-  //set file handle to null after closing so it would not be double closed
-  //in cleanup label
-  file_handle = NULL;
-  //change the permissions of the file
-  errno = 0;
-  //free pid_t path which is allocated
-  free(pid_path);
-  pid_path = NULL;
-
-  get_task_file_path(jobid, taskid, tt_root, &task_script_path);
-
-  if (task_script_path == NULL) {
-    cleanup();
-    return INVALID_TASK_SCRIPT_PATH;
-  }
   errno = 0;
   cleanup();
   execlp(task_script_path, task_script_path, NULL);
@@ -306,83 +248,53 @@
   return exit_code;
 
 cleanup:
-  if (pid_path != NULL) {
-    free(pid_path);
-  }
   if (task_script_path != NULL) {
     free(task_script_path);
   }
-  if (file_handle != NULL) {
-    fclose(file_handle);
-  }
   // free configurations
   cleanup();
   return exit_code;
 }
+
 /**
- * Function used to terminate a task launched by the user.
- *
- * The function first checks if the passed tt-root is found in
- * configured hadoop.temp.dir (which is a list of tt_roots).
- *
- * Then gets the task-pid path using function get_pid_path.
- *
- * reads the task-pid from the file which is mentioned by get_pid_path
- *
- * kills the task by sending SIGTERM to that particular process.
- *
+ * Function used to terminate/kill a task launched by the user.
+ * The function sends appropriate signal to the process group
+ * specified by the task_pid.
  */
 
-int kill_user_task(const char *user, const char *jobid, const char *taskid,
-    const char *tt_root) {
+int kill_user_task(const char *user, const char *task_pid, int sig) {
   int pid = 0;
-  int i = 0;
-  char *pid_path = NULL;
-  FILE *file_handle = NULL;
-#ifdef DEBUG
-  fprintf(LOGFILE,"kill_user_task : Job id : %s \n", jobid);
-  fprintf(LOGFILE,"kill_user_task : task id : %s \n", taskid);
-  fprintf(LOGFILE,"kill_user_task : tt_root : %s \n", tt_root);
-  fflush(LOGFILE);
-#endif
 
-  if (check_tt_root(tt_root) < 0) {
-    fprintf(LOGFILE, "invalid tt root passed %s\n", tt_root);
-    cleanup();
-    return INVALID_TT_ROOT;
+  if(task_pid == NULL) {
+    return INVALID_ARGUMENT_NUMBER;
   }
+  pid = atoi(task_pid);
 
+  if(pid <= 0) {
+    return INVALID_TASK_PID;
+  }
   fclose(LOGFILE);
   fcloseall();
-
   if (change_user(user) != 0) {
     cleanup();
     return SETUID_OPER_FAILED;
   }
 
-  get_pid_path(jobid, taskid, tt_root, &pid_path);
-  if (pid_path == NULL) {
-    cleanup();
-    return INVALID_PID_PATH;
-  }
-  file_handle = fopen(pid_path, "r");
-  if (file_handle == NULL) {
-    free(pid_path);
-    cleanup();
-    return UNABLE_TO_OPEN_PID_FILE_READ_MODE;
-  }
-  fscanf(file_handle, "%d", &pid);
-  fclose(file_handle);
-  free(pid_path);
-  if (pid == 0) {
-    cleanup();
-    return UNABLE_TO_READ_PID;
+  //Don't continue if the process-group is not alive anymore.
+  if(kill(-pid,0) < 0) {
+    errno = 0;
+    return 0;
   }
-  if (kill(pid, SIGTERM) < 0) {
-    fprintf(LOGFILE, "%s\n", strerror(errno));
-    cleanup();
-    return UNABLE_TO_KILL_TASK;
+
+  if (kill(-pid, sig) < 0) {
+    if(errno != ESRCH) {
+      fprintf(LOGFILE, "Error is %s\n", strerror(errno));
+      cleanup();
+      return UNABLE_TO_KILL_TASK;
+    }
+    errno = 0;
   }
   cleanup();
   return 0;
 }
+

Modified: hadoop/core/branches/HADOOP-3628-2/src/c++/task-controller/task-controller.h
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/c%2B%2B/task-controller/task-controller.h?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/c++/task-controller/task-controller.h (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/c++/task-controller/task-controller.h Mon Jun 15 13:23:44 2009
@@ -28,42 +28,37 @@
 #include <sys/stat.h>
 #include <sys/signal.h>
 #include <getopt.h>
+#include<grp.h>
 #include "configuration.h"
 
 //command definitions
 enum command {
-  RUN_TASK,
-  KILL_TASK
+  LAUNCH_TASK_JVM,
+  TERMINATE_TASK_JVM,
+  KILL_TASK_JVM
 };
 
 enum errorcodes {
   INVALID_ARGUMENT_NUMBER = 1,
-  INVALID_USER_NAME,
-  INVALID_COMMAND_PROVIDED,
-  SUPER_USER_NOT_ALLOWED_TO_RUN_TASKS,
-  OUT_OF_MEMORY,
-  INVALID_TT_ROOT,
-  INVALID_PID_PATH,
-  UNABLE_TO_OPEN_PID_FILE_WRITE_MODE,
-  UNABLE_TO_OPEN_PID_FILE_READ_MODE,
-  UNABLE_TO_WRITE_TO_PID_FILE,
-  SETUID_OPER_FAILED,
-  INVALID_TASK_SCRIPT_PATH,
-  UNABLE_TO_EXECUTE_TASK_SCRIPT,
-  UNABLE_TO_READ_PID,
-  UNABLE_TO_KILL_TASK,
-  UNABLE_TO_FIND_PARENT_PID_FILE,
-  TASK_CONTROLLER_SPAWNED_BY_INVALID_PARENT_PROCESS,
-  UNABLE_TO_READ_PARENT_PID
+  INVALID_USER_NAME, //2
+  INVALID_COMMAND_PROVIDED, //3
+  SUPER_USER_NOT_ALLOWED_TO_RUN_TASKS, //4
+  INVALID_TT_ROOT, //5
+  SETUID_OPER_FAILED, //6
+  INVALID_TASK_SCRIPT_PATH, //7
+  UNABLE_TO_EXECUTE_TASK_SCRIPT, //8
+  UNABLE_TO_KILL_TASK, //9
+  INVALID_PROCESS_LAUNCHING_TASKCONTROLLER, //10
+  INVALID_TASK_PID, //11
+  ERROR_RESOLVING_FILE_PATH, //12
+  RELATIVE_PATH_COMPONENTS_IN_FILE_PATH, //13
+  UNABLE_TO_STAT_FILE, //14
+  FILE_NOT_OWNED_BY_TASKTRACKER //15
 };
 
 
-#define TT_PID_PATTERN "%s/hadoop-%s-tasktracker.pid"
-
 #define TT_LOCAL_TASK_SCRIPT_PATTERN "%s/taskTracker/jobcache/%s/%s/taskjvm.sh"
 
-#define TT_SYS_DIR "%s/taskTracker/jobcache/%s/%s/.pid"
-
 #define TT_SYS_DIR_KEY "mapred.local.dir"
 
 #define MAX_ITEMS 10
@@ -81,8 +76,6 @@
 
 int run_task_as_user(const char * user, const char *jobid, const char *taskid, const char *tt_root);
 
-int verify_parent();
-
-int kill_user_task(const char *user, const char *jobid, const char *taskid, const char *tt_root);
+int kill_user_task(const char *user, const char *task_pid, int sig);
 
 int get_user_details(const char *user);

Modified: hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/IFSImpl.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/IFSImpl.java?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/IFSImpl.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/IFSImpl.java Mon Jun 15 13:23:44 2009
@@ -12,7 +12,6 @@
  * implied. See the License for the specific language governing
  * permissions and limitations under the License.
  *
- * @author: Sriram Rao (Kosmix Corp.)
  * 
  * We need to provide the ability to the code in fs/kfs without really
  * having a KFS deployment.  In particular, the glue code that wraps

Modified: hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/KFSImpl.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/KFSImpl.java?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/KFSImpl.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/KFSImpl.java Mon Jun 15 13:23:44 2009
@@ -12,7 +12,6 @@
  * implied. See the License for the specific language governing
  * permissions and limitations under the License.
  *
- * @author: Sriram Rao (Kosmix Corp.)
  * 
  * Provide the implementation of KFS which turn into calls to KfsAccess.
  */

Modified: hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/KFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/KFSInputStream.java?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/KFSInputStream.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/KFSInputStream.java Mon Jun 15 13:23:44 2009
@@ -12,7 +12,6 @@
  * implied. See the License for the specific language governing
  * permissions and limitations under the License.
  *
- * @author: Sriram Rao (Kosmix Corp.)
  * 
  * Implements the Hadoop FSInputStream interfaces to allow applications to read
  * files in Kosmos File System (KFS).

Modified: hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/KFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/KFSOutputStream.java?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/KFSOutputStream.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/KFSOutputStream.java Mon Jun 15 13:23:44 2009
@@ -12,7 +12,6 @@
  * implied. See the License for the specific language governing
  * permissions and limitations under the License.
  *
- * @author: Sriram Rao (Kosmix Corp.)
  * 
  * Implements the Hadoop FSOutputStream interfaces to allow applications to write to
  * files in Kosmos File System (KFS).

Modified: hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/KosmosFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/KosmosFileSystem.java?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/KosmosFileSystem.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/fs/kfs/KosmosFileSystem.java Mon Jun 15 13:23:44 2009
@@ -12,7 +12,6 @@
  * implied. See the License for the specific language governing
  * permissions and limitations under the License.
  *
- * @author: Sriram Rao (Kosmix Corp.)
  * 
  * Implements the Hadoop FS interfaces to allow applications to store
  *files in Kosmos File System (KFS).

Modified: hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/util/ProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/util/ProcessTree.java?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/util/ProcessTree.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/util/ProcessTree.java Mon Jun 15 13:23:44 2009
@@ -54,73 +54,24 @@
   }
 
   /**
-   * Kills the process(OR process group) by sending the signal SIGKILL
-   * in the current thread
-   * @param pid Process id(OR process group id) of to-be-deleted-process
-   * @param isProcessGroup Is pid a process group id of to-be-deleted-processes
-   * @param sleepTimeBeforeSigKill wait time before sending SIGKILL after
-   *  sending SIGTERM
-   */
-  private static void sigKillInCurrentThread(String pid, boolean isProcessGroup,
-      long sleepTimeBeforeSigKill) {
-    // Kill the subprocesses of root process(even if the root process is not
-    // alive) if process group is to be killed.
-    if (isProcessGroup || ProcessTree.isAlive(pid)) {
-      try {
-        // Sleep for some time before sending SIGKILL
-        Thread.sleep(sleepTimeBeforeSigKill);
-      } catch (InterruptedException i) {
-        LOG.warn("Thread sleep is interrupted.");
-      }
-
-      ShellCommandExecutor shexec = null;
-
-      try {
-        String pid_pgrpid;
-        if(isProcessGroup) {//kill the whole process group
-          pid_pgrpid = "-" + pid;
-        }
-        else {//kill single process
-          pid_pgrpid = pid;
-        }
-        
-        String[] args = { "kill", "-9", pid_pgrpid };
-        shexec = new ShellCommandExecutor(args);
-        shexec.execute();
-      } catch (IOException ioe) {
-        LOG.warn("Error executing shell command " + ioe);
-      } finally {
-        if(isProcessGroup) {
-          LOG.info("Killing process group" + pid + " with SIGKILL. Exit code "
-            + shexec.getExitCode());
-        }
-        else {
-          LOG.info("Killing process " + pid + " with SIGKILL. Exit code "
-                    + shexec.getExitCode());
-        }
-      }
-    }
-  }
-
-  /** Kills the process(OR process group) by sending the signal SIGKILL
-   * @param pid Process id(OR process group id) of to-be-deleted-process
-   * @param isProcessGroup Is pid a process group id of to-be-deleted-processes
+   * Destroy the process-tree.
+   * @param pid process id of the root process of the subtree of processes
+   *            to be killed
    * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
    *                               after sending SIGTERM
+   * @param isProcessGroup pid is a process group leader or not
    * @param inBackground Process is to be killed in the back ground with
    *                     a separate thread
    */
-  private static void sigKill(String pid, boolean isProcessGroup,
-                        long sleeptimeBeforeSigkill, boolean inBackground) {
-
-    if(inBackground) { // use a separate thread for killing
-      SigKillThread sigKillThread = new SigKillThread(pid, isProcessGroup,
-                                                      sleeptimeBeforeSigkill);
-      sigKillThread.setDaemon(true);
-      sigKillThread.start();
+  public static void destroy(String pid, long sleeptimeBeforeSigkill,
+                             boolean isProcessGroup, boolean inBackground) {
+    if(isProcessGroup) {
+      destroyProcessGroup(pid, sleeptimeBeforeSigkill, inBackground);
     }
     else {
-      sigKillInCurrentThread(pid, isProcessGroup, sleeptimeBeforeSigkill);
+      //TODO: Destroy all the processes in the subtree in this case also.
+      // For the time being, killing only the root process.
+      destroyProcess(pid, sleeptimeBeforeSigkill, inBackground);
     }
   }
 
@@ -133,6 +84,29 @@
    */
   protected static void destroyProcess(String pid, long sleeptimeBeforeSigkill,
                                     boolean inBackground) {
+    terminateProcess(pid);
+    sigKill(pid, false, sleeptimeBeforeSigkill, inBackground);
+  }
+
+  /** Destroy the process group.
+   * @param pgrpId Process group id of to-be-killed-processes
+   * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
+   *                               after sending SIGTERM
+   * @param inBackground Process group is to be killed in the back ground with
+   *                     a separate thread
+   */
+  protected static void destroyProcessGroup(String pgrpId,
+                       long sleeptimeBeforeSigkill, boolean inBackground) {
+    terminateProcessGroup(pgrpId);
+    sigKill(pgrpId, true, sleeptimeBeforeSigkill, inBackground);
+  }
+
+  /**
+   * Sends terminate signal to the process, allowing it to gracefully exit.
+   * 
+   * @param pid pid of the process to be sent SIGTERM
+   */
+  public static void terminateProcess(String pid) {
     ShellCommandExecutor shexec = null;
     try {
       String[] args = { "kill", pid };
@@ -144,19 +118,15 @@
       LOG.info("Killing process " + pid +
                " with SIGTERM. Exit code " + shexec.getExitCode());
     }
-    
-    sigKill(pid, false, sleeptimeBeforeSigkill, inBackground);
   }
-  
-  /** Destroy the process group.
-   * @param pgrpId Process group id of to-be-killed-processes
-   * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
-   *                               after sending SIGTERM
-   * @param inBackground Process group is to be killed in the back ground with
-   *                     a separate thread
+
+  /**
+   * Sends terminate signal to all the process belonging to the passed process
+   * group, allowing the group to gracefully exit.
+   * 
+   * @param pgrpId process group id
    */
-  protected static void destroyProcessGroup(String pgrpId,
-                       long sleeptimeBeforeSigkill, boolean inBackground) {
+  public static void terminateProcessGroup(String pgrpId) {
     ShellCommandExecutor shexec = null;
     try {
       String[] args = { "kill", "--", "-" + pgrpId };
@@ -168,37 +138,115 @@
       LOG.info("Killing all processes in the process group " + pgrpId +
                " with SIGTERM. Exit code " + shexec.getExitCode());
     }
-    
-    sigKill(pgrpId, true, sleeptimeBeforeSigkill, inBackground);
   }
 
   /**
-   * Destroy the process-tree.
-   * @param pid process id of the root process of the subtree of processes
-   *            to be killed
+   * Kills the process(OR process group) by sending the signal SIGKILL
+   * in the current thread
+   * @param pid Process id(OR process group id) of to-be-deleted-process
+   * @param isProcessGroup Is pid a process group id of to-be-deleted-processes
+   * @param sleepTimeBeforeSigKill wait time before sending SIGKILL after
+   *  sending SIGTERM
+   */
+  private static void sigKillInCurrentThread(String pid, boolean isProcessGroup,
+      long sleepTimeBeforeSigKill) {
+    // Kill the subprocesses of root process(even if the root process is not
+    // alive) if process group is to be killed.
+    if (isProcessGroup || ProcessTree.isAlive(pid)) {
+      try {
+        // Sleep for some time before sending SIGKILL
+        Thread.sleep(sleepTimeBeforeSigKill);
+      } catch (InterruptedException i) {
+        LOG.warn("Thread sleep is interrupted.");
+      }
+      if(isProcessGroup) {
+        killProcessGroup(pid);
+      } else {
+        killProcess(pid);
+      }
+    }  
+  }
+  
+
+  /** Kills the process(OR process group) by sending the signal SIGKILL
+   * @param pid Process id(OR process group id) of to-be-deleted-process
+   * @param isProcessGroup Is pid a process group id of to-be-deleted-processes
    * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
    *                               after sending SIGTERM
-   * @param isProcessGroup pid is a process group leader or not
    * @param inBackground Process is to be killed in the back ground with
    *                     a separate thread
    */
-  public static void destroy(String pid, long sleeptimeBeforeSigkill,
-                             boolean isProcessGroup, boolean inBackground) {
-    if(isProcessGroup) {
-      destroyProcessGroup(pid, sleeptimeBeforeSigkill, inBackground);
+  private static void sigKill(String pid, boolean isProcessGroup,
+                        long sleeptimeBeforeSigkill, boolean inBackground) {
+
+    if(inBackground) { // use a separate thread for killing
+      SigKillThread sigKillThread = new SigKillThread(pid, isProcessGroup,
+                                                      sleeptimeBeforeSigkill);
+      sigKillThread.setDaemon(true);
+      sigKillThread.start();
     }
     else {
-      //TODO: Destroy all the processes in the subtree in this case also.
-      // For the time being, killing only the root process.
-      destroyProcess(pid, sleeptimeBeforeSigkill, inBackground);
+      sigKillInCurrentThread(pid, isProcessGroup, sleeptimeBeforeSigkill);
+    }
+  }
+
+  /**
+   * Sends kill signal to process, forcefully terminating the process.
+   * 
+   * @param pid process id
+   */
+  public static void killProcess(String pid) {
+
+    //If process tree is not alive then return immediately.
+    if(!ProcessTree.isAlive(pid)) {
+      return;
+    }
+    String[] args = { "kill", "-9", pid };
+    ShellCommandExecutor shexec = new ShellCommandExecutor(args);
+    try {
+      shexec.execute();
+    } catch (IOException e) {
+      LOG.warn("Error sending SIGKILL to process "+ pid + " ."+ 
+          StringUtils.stringifyException(e));
+    } finally {
+      LOG.info("Killing process " + pid + " with SIGKILL. Exit code "
+          + shexec.getExitCode());
     }
   }
 
+  /**
+   * Sends kill signal to all process belonging to same process group,
+   * forcefully terminating the process group.
+   * 
+   * @param pgrpId process group id
+   */
+  public static void killProcessGroup(String pgrpId) {
+
+    //If process tree is not alive then return immediately.
+    if(!ProcessTree.isProcessGroupAlive(pgrpId)) {
+      return;
+    }
 
+    String[] args = { "kill", "-9", "-"+pgrpId };
+    ShellCommandExecutor shexec = new ShellCommandExecutor(args);
+    try {
+      shexec.execute();
+    } catch (IOException e) {
+      LOG.warn("Error sending SIGKILL to process group "+ pgrpId + " ."+ 
+          StringUtils.stringifyException(e));
+    } finally {
+      LOG.info("Killing process group" + pgrpId + " with SIGKILL. Exit code "
+          + shexec.getExitCode());
+    }
+  }
+  
   /**
    * Is the process with PID pid still alive?
    * This method assumes that isAlive is called on a pid that was alive not
    * too long ago, and hence assumes no chance of pid-wrapping-around.
+   * 
+   * @param pid pid of the process to check.
+   * @return true if process is alive.
    */
   public static boolean isAlive(String pid) {
     ShellCommandExecutor shexec = null;
@@ -215,6 +263,32 @@
     }
     return (shexec.getExitCode() == 0 ? true : false);
   }
+  
+  /**
+   * Is the process group with  still alive?
+   * 
+   * This method assumes that isAlive is called on a pid that was alive not
+   * too long ago, and hence assumes no chance of pid-wrapping-around.
+   * 
+   * @param pgrpId process group id
+   * @return true if any of process in group is alive.
+   */
+  public static boolean isProcessGroupAlive(String pgrpId) {
+    ShellCommandExecutor shexec = null;
+    try {
+      String[] args = { "kill", "-0", "-"+pgrpId };
+      shexec = new ShellCommandExecutor(args);
+      shexec.execute();
+    } catch (ExitCodeException ee) {
+      return false;
+    } catch (IOException ioe) {
+      LOG.warn("Error executing shell command "
+          + Arrays.toString(shexec.getExecString()) + ioe);
+      return false;
+    }
+    return (shexec.getExitCode() == 0 ? true : false);
+  }
+  
 
   /**
    * Helper thread class that kills process-tree with SIGKILL in background

Modified: hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/util/Service.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/util/Service.java?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/util/Service.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/core/org/apache/hadoop/util/Service.java Mon Jun 15 13:23:44 2009
@@ -208,7 +208,7 @@
    * @throws IOException           for any ping failure
    * @throws ServiceStateException if the component is in a wrong state.
    */
-  public ServiceStatus ping() throws IOException {
+  protected ServiceStatus ping() throws IOException {
     ServiceStatus status = new ServiceStatus(this);
     ServiceState state = status.getState();
     if (state == ServiceState.LIVE) {

Modified: hadoop/core/branches/HADOOP-3628-2/src/docs/src/documentation/content/xdocs/cluster_setup.xml
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/docs/src/documentation/content/xdocs/cluster_setup.xml?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/docs/src/documentation/content/xdocs/cluster_setup.xml (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/docs/src/documentation/content/xdocs/cluster_setup.xml Mon Jun 15 13:23:44 2009
@@ -696,7 +696,10 @@
             
             <p>
             The executable must be deployed as a setuid executable, by changing
-            the ownership to <em>root</em> and giving it permissions <em>4755</em>. 
+            the ownership to <em>root</em>, group ownership to that of tasktracker
+            and giving it permissions <em>4510</em>.Please take a note that,
+            group which owns task-controller should contain only tasktracker
+            as its memeber and not users who submit jobs.
             </p>
             
             <p>The executable requires a configuration file called 

Modified: hadoop/core/branches/HADOOP-3628-2/src/examples/org/apache/hadoop/examples/DBCountPageView.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/examples/org/apache/hadoop/examples/DBCountPageView.java?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/examples/org/apache/hadoop/examples/DBCountPageView.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/examples/org/apache/hadoop/examples/DBCountPageView.java Mon Jun 15 13:23:44 2009
@@ -32,23 +32,20 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.lib.LongSumReducer;
-import org.apache.hadoop.mapred.lib.db.DBConfiguration;
-import org.apache.hadoop.mapred.lib.db.DBInputFormat;
-import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
-import org.apache.hadoop.mapred.lib.db.DBWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
+import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
+import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -79,7 +76,8 @@
   private static final String[] AccessFieldNames = {"url", "referrer", "time"};
   private static final String[] PageviewFieldNames = {"url", "pageview"};
   
-  private static final String DB_URL = "jdbc:hsqldb:hsql://localhost/URLAccess";
+  private static final String DB_URL = 
+    "jdbc:hsqldb:hsql://localhost/URLAccess";
   private static final String DRIVER_CLASS = "org.hsqldb.jdbcDriver";
   
   private Server server;
@@ -87,7 +85,7 @@
   private void startHsqldbServer() {
     server = new Server();
     server.setDatabasePath(0, 
-        System.getProperty("test.build.data",".") + "/URLAccess");
+        System.getProperty("test.build.data", "/tmp") + "/URLAccess");
     server.setDatabaseName(0, "URLAccess");
     server.start();
   }
@@ -193,10 +191,11 @@
 
 
       //Pages in the site :
-      String[] pages = {"/a", "/b", "/c", "/d", "/e", "/f", "/g", "/h", "/i", "/j"};
+      String[] pages = {"/a", "/b", "/c", "/d", "/e", 
+                        "/f", "/g", "/h", "/i", "/j"};
       //linkMatrix[i] is the array of pages(indexes) that page_i links to.  
-      int[][] linkMatrix = {{1,5,7}, {0,7,4,6,}, {0,1,7,8}, {0,2,4,6,7,9}, {0,1},
-          {0,3,5,9}, {0}, {0,1,3}, {0,2,6}, {0,2,6}};
+      int[][] linkMatrix = {{1,5,7}, {0,7,4,6,}, {0,1,7,8}, 
+        {0,2,4,6,7,9}, {0,1}, {0,3,5,9}, {0}, {0,1,3}, {0,2,6}, {0,2,6}};
 
       //a mini model of user browsing a la pagerank
       int currentPage = random.nextInt(pages.length); 
@@ -211,7 +210,8 @@
 
         int action = random.nextInt(PROBABILITY_PRECISION);
 
-        //go to a new page with probability NEW_PAGE_PROBABILITY / PROBABILITY_PRECISION
+        // go to a new page with probability 
+        // NEW_PAGE_PROBABILITY / PROBABILITY_PRECISION
         if(action < NEW_PAGE_PROBABILITY) { 
           currentPage = random.nextInt(pages.length); // a random page
           referrer = null;
@@ -337,17 +337,15 @@
    * Mapper extracts URLs from the AccessRecord (tuples from db), 
    * and emits a &lt;url,1&gt; pair for each access record. 
    */
-  static class PageviewMapper extends MapReduceBase 
-    implements Mapper<LongWritable, AccessRecord, Text, LongWritable> {
+  static class PageviewMapper extends 
+      Mapper<LongWritable, AccessRecord, Text, LongWritable> {
     
     LongWritable ONE = new LongWritable(1L);
     @Override
-    public void map(LongWritable key, AccessRecord value,
-        OutputCollector<Text, LongWritable> output, Reporter reporter)
-        throws IOException {
-      
+    public void map(LongWritable key, AccessRecord value, Context context)
+        throws IOException, InterruptedException {
       Text oKey = new Text(value.url);
-      output.collect(oKey, ONE);
+      context.write(oKey, ONE);
     }
   }
   
@@ -355,20 +353,19 @@
    * Reducer sums up the pageviews and emits a PageviewRecord, 
    * which will correspond to one tuple in the db.
    */
-  static class PageviewReducer extends MapReduceBase 
-    implements Reducer<Text, LongWritable, PageviewRecord, NullWritable> {
+  static class PageviewReducer extends 
+      Reducer<Text, LongWritable, PageviewRecord, NullWritable> {
     
     NullWritable n = NullWritable.get();
     @Override
-    public void reduce(Text key, Iterator<LongWritable> values,
-        OutputCollector<PageviewRecord, NullWritable> output, Reporter reporter)
-        throws IOException {
+    public void reduce(Text key, Iterable<LongWritable> values, 
+        Context context) throws IOException, InterruptedException {
       
       long sum = 0L;
-      while(values.hasNext()) {
-        sum += values.next().get();
+      for(LongWritable value: values) {
+        sum += value.get();
       }
-      output.collect(new PageviewRecord(key.toString(), sum), n);
+      context.write(new PageviewRecord(key.toString(), sum), n);
     }
   }
   
@@ -385,17 +382,18 @@
     }
     
     initialize(driverClassName, url);
+    Configuration conf = getConf();
+
+    DBConfiguration.configureDB(conf, driverClassName, url);
 
-    JobConf job = new JobConf(getConf(), DBCountPageView.class);
+    Job job = new Job(conf);
         
     job.setJobName("Count Pageviews of URLs");
-
+    job.setJarByClass(DBCountPageView.class);
     job.setMapperClass(PageviewMapper.class);
     job.setCombinerClass(LongSumReducer.class);
     job.setReducerClass(PageviewReducer.class);
 
-    DBConfiguration.configureDB(job, driverClassName, url);
-    
     DBInputFormat.setInput(job, AccessRecord.class, "Access"
         , null, "url", AccessFieldNames);
 
@@ -406,10 +404,9 @@
 
     job.setOutputKeyClass(PageviewRecord.class);
     job.setOutputValueClass(NullWritable.class);
-
+    int ret;
     try {
-      JobClient.runJob(job);
-      
+      ret = job.waitForCompletion(true) ? 0 : 1;
       boolean correct = verify();
       if(!correct) {
         throw new RuntimeException("Evaluation was not correct!");
@@ -417,7 +414,7 @@
     } finally {
       shutdown();    
     }
-    return 0;
+    return ret;
   }
 
   public static void main(String[] args) throws Exception {

Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java Mon Jun 15 13:23:44 2009
@@ -59,43 +59,7 @@
     context.shExec = shexec;
     shexec.execute();
   }
-  
-  /**
-   * Kills the JVM running the task stored in the context.
-   * 
-   * @param context the context storing the task running within the JVM
-   * that needs to be killed.
-   */
-  void killTaskJVM(TaskController.TaskControllerContext context) {
-    ShellCommandExecutor shexec = context.shExec;
-
-    if (shexec != null) {
-      Process process = shexec.getProcess();
-      if (Shell.WINDOWS) {
-        // Currently we don't use setsid on WINDOWS. So kill the process alone.
-        if (process != null) {
-          process.destroy();
-        }
-      }
-      else { // In addition to the task JVM, kill its subprocesses also.
-        String pid = context.pid;
-        if (pid != null) {
-          ProcessTree.destroy(pid, context.sleeptimeBeforeSigkill,
-              ProcessTree.isSetsidAvailable, false);
-          try {
-            if (process != null) {
-              LOG.info("Process exited with exit code:" + process.waitFor());
-            }
-          } catch (InterruptedException ie) {}
-        }
-        else if (process != null) {
-          // kill the task JVM alone, if we don't have the process group id
-          process.destroy();
-        }
-      }
-    }
-  }
-  
+    
   /**
    * Initialize the task environment.
    * 
@@ -123,5 +87,50 @@
   @Override
   void initializeJob(JobID jobId) {
   }
+
+  @Override
+  void terminateTask(TaskControllerContext context) {
+    ShellCommandExecutor shexec = context.shExec;
+    if (shexec != null) {
+      Process process = shexec.getProcess();
+      if (Shell.WINDOWS) {
+        // Currently we don't use setsid on WINDOWS. 
+        //So kill the process alone.
+        if (process != null) {
+          process.destroy();
+        }
+      }
+      else { // In addition to the task JVM, kill its subprocesses also.
+        String pid = context.pid;
+        if (pid != null) {
+          if(ProcessTree.isSetsidAvailable) {
+            ProcessTree.terminateProcessGroup(pid);
+          }else {
+            ProcessTree.terminateProcess(pid);
+          }
+        }
+      }
+    }
+  }
+  
+  @Override
+  void killTask(TaskControllerContext context) {
+    ShellCommandExecutor shexec = context.shExec;
+    if (shexec != null) {
+      if (Shell.WINDOWS) {
+        //We don't do send kill process signal in case of windows as 
+        //already we have done a process.destroy() in termintateTaskJVM()
+        return;
+      }
+      String pid = context.pid;
+      if (pid != null) {
+        if(ProcessTree.isSetsidAvailable) {
+          ProcessTree.killProcessGroup(pid);
+        }else {
+          ProcessTree.killProcess(pid);
+        }
+      }
+    }
+  }
   
 }

Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JobTracker.java Mon Jun 15 13:23:44 2009
@@ -123,7 +123,7 @@
   private int NUM_HEARTBEATS_IN_SECOND = 100;
   public static enum State { INITIALIZING, RUNNING }
   State state = State.INITIALIZING;
-  private static final int SYSTEM_DIR_CLEANUP_RETRY_PERIOD = 10000;
+  private static final int FS_ACCESS_RETRY_PERIOD = 10000;
   /**
    * Time in milliseconds to sleep while trying to start the job tracker:
    * {@value}
@@ -1205,17 +1205,38 @@
         shouldRecover = false;
 
         // write the jobtracker.info file
-        FSDataOutputStream out = FileSystem.create(fs, restartFile, filePerm);
-        out.writeInt(0);
-        out.close();
+        try {
+          FSDataOutputStream out = FileSystem.create(fs, restartFile, 
+                                                     filePerm);
+          out.writeInt(0);
+          out.close();
+        } catch (IOException ioe) {
+          LOG.warn("Writing to file " + restartFile + " failed!");
+          LOG.warn("FileSystem is not ready yet!");
+          fs.delete(restartFile, false);
+          throw ioe;
+        }
         return;
       }
 
       FSDataInputStream in = fs.open(restartFile);
-      // read the old count
-      restartCount = in.readInt();
-      ++restartCount; // increment the restart count
-      in.close();
+      try {
+        // read the old count
+        restartCount = in.readInt();
+        ++restartCount; // increment the restart count
+      } catch (IOException ioe) {
+        LOG.warn("System directory is garbled. Failed to read file " 
+                 + restartFile);
+        LOG.warn("Jobtracker recovery is not possible with garbled"
+                 + " system directory! Please delete the system directory and"
+                 + " restart the jobtracker. Note that deleting the system" 
+                 + " directory will result in loss of all the running jobs.");
+        throw new RuntimeException(ioe);
+      } finally {
+        if (in != null) {
+          in.close();
+        }
+      }
 
       // Write back the new restart count and rename the old info file
       //TODO This is similar to jobhistory recovery, maybe this common code
@@ -1762,29 +1783,12 @@
                 ie);
       }
       try {
-        Thread.sleep(SYSTEM_DIR_CLEANUP_RETRY_PERIOD);
+        Thread.sleep(FS_ACCESS_RETRY_PERIOD);
       } catch (InterruptedException e) {
         throw new IOException("Interrupted during system directory cleanup ",
                 e);
       }
     }
-
-    // Prepare for recovery. This is done irrespective of the status of restart
-    // flag.
-    try {
-      recoveryManager.updateRestartCount();
-    } catch (IOException ioe) {
-      LOG.warn("Failed to initialize recovery manager. The Recovery manager "
-               + "failed to access the system files in the system dir (" 
-               + getSystemDir() + ")."); 
-      LOG.warn("It might be because the JobTracker failed to read/write system"
-               + " files (" + recoveryManager.getRestartCountFile() + " / " 
-               + recoveryManager.getTempRestartCountFile() + ") or the system "
-               + " file " + recoveryManager.getRestartCountFile() 
-               + " is missing!");
-      LOG.warn("Bailing out...");
-      throw ioe;
-    }
     
     // Same with 'localDir' except it's always on the local disk.
     jobConf.deleteLocalFiles(SUBDIR);
@@ -1912,6 +1916,20 @@
       //catch re-entrancy by returning early
       return;
     };
+    // Prepare for recovery. This is done irrespective of the status of restart
+    // flag.
+    while (true) {
+      try {
+        recoveryManager.updateRestartCount();
+        break;
+      } catch (IOException ioe) {
+        LOG.warn("Failed to initialize recovery manager. ", ioe);
+        // wait for some time
+        Thread.sleep(FS_ACCESS_RETRY_PERIOD);
+        LOG.warn("Retrying...");
+      }
+    }
+
     taskScheduler.start();
     
     //  Start the recovery after starting the scheduler

Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/JvmManager.java Mon Jun 15 13:23:44 2009
@@ -438,7 +438,7 @@
                 .getLong("mapred.tasktracker.tasks.sleeptime-before-sigkill",
                     ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
 
-            controller.killTaskJVM(initalContext);
+            controller.destroyTaskJVM(initalContext);
           } else {
             LOG.info(String.format("JVM Not killed %s but just removed", jvmId
                 .toString()));

Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java Mon Jun 15 13:23:44 2009
@@ -24,12 +24,14 @@
 import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 
 /**
@@ -107,6 +109,7 @@
    */
   enum TaskCommands {
     LAUNCH_TASK_JVM,
+    TERMINATE_TASK_JVM,
     KILL_TASK_JVM
   }
   
@@ -126,38 +129,65 @@
       TaskLog.buildCommandLine(env.setup, env.vargs, env.stdout, env.stderr,
           env.logSize, true);
 
+    StringBuffer sb = new StringBuffer();
+    //export out all the environment variable before child command as
+    //the setuid/setgid binaries would not be getting, any environmental
+    //variables which begin with LD_*.
+    for(Entry<String, String> entry : env.env.entrySet()) {
+      sb.append("export ");
+      sb.append(entry.getKey());
+      sb.append("=");
+      sb.append(entry.getValue());
+      sb.append("\n");
+    }
+    sb.append(cmdLine);
     // write the command to a file in the
     // task specific cache directory
-    writeCommand(cmdLine, getTaskCacheDirectory(context));
+    writeCommand(sb.toString(), getTaskCacheDirectory(context));
     
     // Call the taskcontroller with the right parameters.
-    List<String> launchTaskJVMArgs = buildTaskCommandArgs(context);
+    List<String> launchTaskJVMArgs = buildLaunchTaskArgs(context);
     ShellCommandExecutor shExec =  buildTaskControllerExecutor(
                                     TaskCommands.LAUNCH_TASK_JVM, 
                                     env.conf.getUser(),
                                     launchTaskJVMArgs, env);
     context.shExec = shExec;
-    shExec.execute();
-    LOG.debug("output after executing task jvm = " + shExec.getOutput());
+    try {
+      shExec.execute();
+    } catch (Exception e) {
+      LOG.warn("Exception thrown while launching task JVM : " + 
+          StringUtils.stringifyException(e));
+      LOG.warn("Exit code from task is : " + shExec.getExitCode());
+      LOG.warn("Output from task-contoller is : " + shExec.getOutput());
+      throw new IOException(e);
+    }
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("output after executing task jvm = " + shExec.getOutput()); 
+    }
   }
 
-  // convenience API for building command arguments for specific commands
-  private List<String> buildTaskCommandArgs(TaskControllerContext context) {
+  /**
+   * Returns list of arguments to be passed while launching task VM.
+   * See {@code buildTaskControllerExecutor(TaskCommands, 
+   * String, List<String>, JvmEnv)} documentation.
+   * @param context
+   * @return Argument to be used while launching Task VM
+   */
+  private List<String> buildLaunchTaskArgs(TaskControllerContext context) {
     List<String> commandArgs = new ArrayList<String>(3);
     String taskId = context.task.getTaskID().toString();
     String jobId = getJobId(context);
+    LOG.debug("getting the task directory as: " 
+        + getTaskCacheDirectory(context));
+    commandArgs.add(getDirectoryChosenForTask(
+        new File(getTaskCacheDirectory(context)), 
+        context));
     commandArgs.add(jobId);
     if(!context.task.isTaskCleanupTask()) {
       commandArgs.add(taskId);
     }else {
       commandArgs.add(taskId + TaskTracker.TASK_CLEANUP_SUFFIX);
     }
-    
-    LOG.debug("getting the task directory as: " 
-                + getTaskCacheDirectory(context));
-    commandArgs.add(getDirectoryChosenForTask(
-                              new File(getTaskCacheDirectory(context)), 
-                              context));
     return commandArgs;
   }
   
@@ -173,7 +203,7 @@
   // in mapred.local.dir chosen for storing data pertaining to
   // this task.
   private String getDirectoryChosenForTask(File directory,
-                                            TaskControllerContext context) {
+      TaskControllerContext context) {
     String jobId = getJobId(context);
     String taskId = context.task.getTaskID().toString();
     for (String dir : mapredLocalDirs) {
@@ -184,43 +214,13 @@
         return dir;
       }
     }
-    
+
     LOG.error("Couldn't parse task cache directory correctly");
     throw new IllegalArgumentException("invalid task cache directory "
-                + directory.getAbsolutePath());
+        + directory.getAbsolutePath());
   }
   
   /**
-   * Kill a launched task JVM running as the user of the job.
-   * 
-   * This method will launch the task controller setuid executable
-   * that in turn will kill the task JVM by sending a kill signal.
-   * @param context the context storing the task running within the JVM
-   * that needs to be killed.
-   */
-  void killTaskJVM(TaskControllerContext context) {
-   
-    if(context.task == null) {
-      LOG.info("Context task null not killing the JVM");
-      return;
-    }
-    
-    JvmEnv env = context.env;
-    List<String> killTaskJVMArgs = buildTaskCommandArgs(context);
-    try {
-      ShellCommandExecutor shExec = buildTaskControllerExecutor(
-                                      TaskCommands.KILL_TASK_JVM,
-                                      context.env.conf.getUser(),
-                                      killTaskJVMArgs, 
-                                      context.env);
-      shExec.execute();
-      LOG.debug("Command output :" +shExec.getOutput());
-    } catch (IOException ioe) {
-      LOG.warn("IOException in killing task: " + ioe.getMessage());
-    }
-  }
-
-  /**
    * Setup appropriate permissions for directories and files that
    * are used by the task.
    * 
@@ -281,9 +281,24 @@
       LOG.warn("Could not change permissions for directory " + dir);
     }
   }
-  
-  // convenience API to create the executor for launching the
-  // setuid script.
+  /**
+   * Builds the command line for launching/terminating/killing task JVM.
+   * Following is the format for launching/terminating/killing task JVM
+   * <br/>
+   * For launching following is command line argument:
+   * <br/>
+   * {@code user-name command tt-root job_id task_id} 
+   * <br/>
+   * For terminating/killing task jvm.
+   * {@code user-name command tt-root task-pid}
+   * 
+   * @param command command to be executed.
+   * @param userName user name
+   * @param cmdArgs list of extra arguments
+   * @param env JVM environment variables.
+   * @return {@link ShellCommandExecutor}
+   * @throws IOException
+   */
   private ShellCommandExecutor buildTaskControllerExecutor(TaskCommands command, 
                                           String userName, 
                                           List<String> cmdArgs, JvmEnv env) 
@@ -420,6 +435,67 @@
     }
   }
 
+  /**
+   * API which builds the command line to be pass to LinuxTaskController
+   * binary to terminate/kill the task. See 
+   * {@code buildTaskControllerExecutor(TaskCommands, 
+   * String, List<String>, JvmEnv)} documentation.
+   * 
+   * 
+   * @param context context of task which has to be passed kill signal.
+   * 
+   */
+  private List<String> buildKillTaskCommandArgs(TaskControllerContext 
+      context){
+    List<String> killTaskJVMArgs = new ArrayList<String>();
+    killTaskJVMArgs.add(context.pid);
+    return killTaskJVMArgs;
+  }
+  
+  /**
+   * Convenience method used to sending appropriate Kill signal to the task 
+   * VM
+   * @param context
+   * @param command
+   * @throws IOException
+   */
+  private void finishTask(TaskControllerContext context,
+      TaskCommands command) throws IOException{
+    if(context.task == null) {
+      LOG.info("Context task null not killing the JVM");
+      return;
+    }
+    ShellCommandExecutor shExec = buildTaskControllerExecutor(
+        command, context.env.conf.getUser(), 
+        buildKillTaskCommandArgs(context), context.env);
+    try {
+      shExec.execute();
+    } catch (Exception e) {
+      LOG.warn("Output from task-contoller is : " + shExec.getOutput());
+      throw new IOException(e);
+    }
+  }
+  
+  @Override
+  void terminateTask(TaskControllerContext context) {
+    try {
+      finishTask(context, TaskCommands.TERMINATE_TASK_JVM);
+    } catch (Exception e) {
+      LOG.warn("Exception thrown while sending kill to the Task VM " + 
+          StringUtils.stringifyException(e));
+    }
+  }
+  
+  @Override
+  void killTask(TaskControllerContext context) {
+    try {
+      finishTask(context, TaskCommands.KILL_TASK_JVM);
+    } catch (Exception e) {
+      LOG.warn("Exception thrown while sending destroy to the Task VM " + 
+          StringUtils.stringifyException(e));
+    }
+  }
+
   protected String getTaskControllerExecutablePath() {
     return taskControllerExe;
   }  

Modified: hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskController.java?rev=784771&r1=784770&r2=784771&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/mapred/org/apache/hadoop/mapred/TaskController.java Mon Jun 15 13:23:44 2009
@@ -19,10 +19,12 @@
 
 import java.io.IOException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
-import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 
 /**
@@ -38,6 +40,8 @@
   
   private Configuration conf;
   
+  public static final Log LOG = LogFactory.getLog(TaskController.class);
+  
   public Configuration getConf() {
     return conf;
   }
@@ -63,13 +67,29 @@
                                       throws IOException;
   
   /**
-   * Kill a task JVM
+   * Top level cleanup a task JVM method.
+   *
+   * The current implementation does the following.
+   * <ol>
+   * <li>Sends a graceful terminate signal to task JVM allowing its sub-process
+   * to cleanup.</li>
+   * <li>Waits for stipulated period</li>
+   * <li>Sends a forceful kill signal to task JVM, terminating all its
+   * sub-process forcefully.</li>
+   * </ol>
    * 
-   * This method defines how a JVM launched to execute one or more
-   * tasks will be killed.
-   * @param context
+   * @param context the task for which kill signal has to be sent.
    */
-  abstract void killTaskJVM(TaskControllerContext context);
+  final void destroyTaskJVM(TaskControllerContext context) {
+    terminateTask(context);
+    try {
+      Thread.sleep(context.sleeptimeBeforeSigkill);
+    } catch (InterruptedException e) {
+      LOG.warn("Sleep interrupted : " + 
+          StringUtils.stringifyException(e));
+    }
+    killTask(context);
+  }
   
   /**
    * Perform initializing actions required before a task can run.
@@ -110,4 +130,20 @@
    * @param tip  Task of job for which localization happens.
    */
   abstract void initializeJob(JobID jobId);
+  
+  /**
+   * Sends a graceful terminate signal to taskJVM and it sub-processes. 
+   *   
+   * @param context task context
+   */
+  abstract void terminateTask(TaskControllerContext context);
+  
+  /**
+   * Sends a KILL signal to forcefully terminate the taskJVM and its
+   * sub-processes.
+   * 
+   * @param context task context
+   */
+  
+  abstract void killTask(TaskControllerContext context);
 }



Mime
View raw message