hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sra...@apache.org
Subject svn commit: r1350835 - in /hadoop/common/branches/branch-1-win: ./ src/core/org/apache/hadoop/util/ src/mapred/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ src/winutils/
Date Sat, 16 Jun 2012 00:33:16 GMT
Author: sradia
Date: Sat Jun 16 00:33:15 2012
New Revision: 1350835

URL: http://svn.apache.org/viewvc?rev=1350835&view=rev
Log:
MAPREDUCE-4260 Use JobObject to spawn tasks on Windows (Bikas Saha via Sanjay)

Added:
    hadoop/common/branches/branch-1-win/src/winutils/task.c
Modified:
    hadoop/common/branches/branch-1-win/CHANGES.txt
    hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/util/ProcessTree.java
    hadoop/common/branches/branch-1-win/src/mapred/mapred-default.xml
    hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Child.java
    hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
    hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobConf.java
    hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JvmManager.java
    hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
    hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskController.java
    hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java
    hadoop/common/branches/branch-1-win/src/winutils/common.h
    hadoop/common/branches/branch-1-win/src/winutils/main.c
    hadoop/common/branches/branch-1-win/src/winutils/winutils.vcxproj

Modified: hadoop/common/branches/branch-1-win/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/CHANGES.txt?rev=1350835&r1=1350834&r2=1350835&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1-win/CHANGES.txt Sat Jun 16 00:33:15 2012
@@ -31,6 +31,8 @@ branch-hadoop-1-win - unreleased
     MAPREDUCE-4321. Fix both DCE & LCE to use File.getAbsolutePath of the
     taskjvm.(sh,cmd) to ensure it works on Windows. (Ivan Mitic via acmurthy)
 
+    MAPREDUCE-4260 Use JobObject to spawn tasks on Windows (Bikas Saha via Sanjay)
+
 Release 1.1.0 - unreleased
 
   NEW FEATURES

Modified: hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/util/ProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/util/ProcessTree.java?rev=1350835&r1=1350834&r2=1350835&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/util/ProcessTree.java (original)
+++ hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/util/ProcessTree.java Sat
Jun 16 00:33:15 2012
@@ -49,23 +49,49 @@ public class ProcessTree {
     }
   }
 
-  public static final boolean isSetsidAvailable = isSetsidSupported();
-  private static boolean isSetsidSupported() {
-    if (Shell.DISABLEWINDOWS_TEMPORARILY)
-      return false;
-    ShellCommandExecutor shexec = null;
-    boolean setsidSupported = true;
-    try {
-      String[] args = {"setsid", "bash", "-c", "echo $$"};
-      shexec = new ShellCommandExecutor(args);
-      shexec.execute();
-    } catch (IOException ioe) {
-      LOG.warn("setsid is not available on this machine. So not using it.");
-      setsidSupported = false;
-    } finally { // handle the exit code
-      LOG.info("setsid exited with exit code " + shexec.getExitCode());
+  // TODO rename isSetsidAvailable after merge of branch-1-win (MAPREDUCE-4325)
+  public static boolean isSetsidAvailable = isProcessGroupSupported();
+  private static boolean isProcessGroupSupported() {
+    boolean processGroupSupported = true;
+    if (Shell.WINDOWS) {
+      ShellCommandExecutor shexec = null;
+      try {
+        String args[] = {Shell.WINUTILS};
+        shexec = new ShellCommandExecutor(args);
+        shexec.execute();
+      } catch (IOException e) {
+      } finally {
+        String result = shexec.getOutput();
+        if (result == null
+            || !result.contains("Creates a new task jobobject with taskname")) {
+          processGroupSupported = false;
+        }
+      }
     }
-    return setsidSupported;
+    else {
+      ShellCommandExecutor shexec = null;
+      try {
+        String[] args = {"setsid", "bash", "-c", "echo $$"};
+        shexec = new ShellCommandExecutor(args);
+        shexec.execute();
+      } catch (IOException ioe) {
+        LOG.warn("setsid is not available on this machine. So not using it.");
+        processGroupSupported = false;
+      } finally { // handle the exit code
+        LOG.info("setsid exited with exit code " + shexec.getExitCode());
+      }
+    }
+    if(processGroupSupported) {
+      LOG.info("Platform supports process groups");
+    }
+    else {
+      LOG.info("Platform does not support process groups");
+    }
+    return processGroupSupported;
+  }
+  
+  public static void disableProcessGroups() {
+    isSetsidAvailable = false;
   }
 
   /**
@@ -110,15 +136,12 @@ public class ProcessTree {
     String[] args = null;
     if(Shell.WINDOWS){
       if (signal == Signal.KILL) {
-        String[] wargs = { "taskkill", "/F", "/PID", pid };
-        args = wargs;
+        args = new String[] { "taskkill", "/T", "/F", "/PID", pid };
       } else {
-        String[] wargs = { "taskkill", "/PID", pid };
-        args = wargs;
+        args = new String[] { "taskkill", "/T", "/PID", pid };
       }
     } else {
-     String[] uargs = { "kill", "-" + signal.getValue(), pid };
-     args = uargs;
+      args = new String[] { "kill", "-" + signal.getValue(), pid };
     }
     ShellCommandExecutor shexec = new ShellCommandExecutor(args);
     try {
@@ -146,7 +169,12 @@ public class ProcessTree {
       return;
     }
 
-    String[] args = { "kill", "-" + signal.getValue() , "-"+pgrpId };
+    String[] args = null;
+    if(Shell.WINDOWS){
+      args = new String[] {Shell.WINUTILS, "task", "kill", pgrpId};
+    } else {
+      args = new String[] { "kill", "-" + signal.getValue() , "-"+pgrpId };
+    }
     ShellCommandExecutor shexec = new ShellCommandExecutor(args);
     try {
       shexec.execute();
@@ -197,26 +225,38 @@ public class ProcessTree {
   /**
    * Is the process group with  still alive?
    * 
-   * This method assumes that isAlive is called on a pid that was alive not
+   * On Linux, this method assumes that isAlive is called on a pid that was alive not
    * too long ago, and hence assumes no chance of pid-wrapping-around.
+   * On Windows, this uses jobobjects
    * 
    * @param pgrpId process group id
    * @return true if any of process in group is alive.
    */
   public static boolean isProcessGroupAlive(String pgrpId) {
-    ShellCommandExecutor shexec = null;
-    try {
-      String[] args = { "kill", "-0", "-"+pgrpId };
-      shexec = new ShellCommandExecutor(args);
-      shexec.execute();
-    } catch (ExitCodeException ee) {
-      return false;
-    } catch (IOException ioe) {
-      LOG.warn("Error executing shell command "
-          + Arrays.toString(shexec.getExecString()) + ioe);
-      return false;
+    if (Shell.WINDOWS) {
+      try {
+        String result = Shell.execCommand("cmd", "/c", Shell.WINUTILS
+            + " task isAlive " + pgrpId);
+        return (result.contains("IsAlive"));
+      } catch (IOException ioe) {
+        LOG.warn("Error executing shell command", ioe);
+        return false;
+      }
+    } else {
+      ShellCommandExecutor shexec = null;
+      try {
+        String[] args = { "kill", "-0", "-"+pgrpId };
+        shexec = new ShellCommandExecutor(args);
+        shexec.execute();
+      } catch (ExitCodeException ee) {
+        return false;
+      } catch (IOException ioe) {
+        LOG.warn("Error executing shell command "
+            + Arrays.toString(shexec.getExecString()) + ioe);
+        return false;
+      }
+      return (shexec.getExitCode() == 0 ? true : false);
     }
-    return (shexec.getExitCode() == 0 ? true : false);
   }
   
 

Modified: hadoop/common/branches/branch-1-win/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/mapred-default.xml?rev=1350835&r1=1350834&r2=1350835&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/mapred-default.xml Sat Jun 16 00:33:15
2012
@@ -416,6 +416,15 @@
 -->
 
 <property>
+  <name>mapred.processgroup.enabled</name>
+  <value>true</value>
+  <description>Use process groups for task management.
+  Ability to use process groups is determined by the system. It can only be 
+  disabled via config.
+  </description>
+</property>
+
+<property>
   <name>mapred.child.java.opts</name>
   <value>-Xmx200m</value>
   <description>Java opts for the task tracker child processes.  

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Child.java?rev=1350835&r1=1350834&r2=1350835&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Child.java Sat
Jun 16 00:33:15 2012
@@ -161,16 +161,16 @@ class Child {
     t.start();
     
     String pid = "";
-    if (!Shell.WINDOWS) {
-      pid = System.getenv().get("JVM_PID");
-    }
-    else {
+    pid = System.getenv().get("JVM_PID");
+    if(pid == null || pid.length() == 0) {
+      pid = "";
+      // fallback to using JVM bean if PID is not set in env
       final String jvmName = ManagementFactory.getRuntimeMXBean().getName();
       String[] parts = jvmName.split("@");
       try {
         pid = Long.toString(Long.parseLong(parts[0]));
       } catch (NumberFormatException nfe) {
-        LOG.equals("Failed to get pid for jvmId:" + jvmId);
+        LOG.info("Failed to get pid for jvmId:" + jvmId);
       }
     }
     JvmContext context = new JvmContext(jvmId, pid);

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java?rev=1350835&r1=1350834&r2=1350835&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
(original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
Sat Jun 16 00:33:15 2012
@@ -126,9 +126,19 @@ public class DefaultTaskController exten
 
       String commandFile = writeCommand(cmdLine, rawFs, p).getAbsolutePath();
       rawFs.setPermission(p, TaskController.TASK_LAUNCH_SCRIPT_PERMISSION);
-      shExec = new ShellCommandExecutor(Shell.WINDOWS? new String[]{"cmd", "/c", commandFile}
:
-          new String[]{"bash", "-c", commandFile},
-          currentWorkDirectory);
+      String[] commandArray = null;
+      if(Shell.WINDOWS) {
+        if(ProcessTree.isSetsidAvailable) {
+          commandArray = new String[] { Shell.WINUTILS, "task", "create",
+              attemptId, "cmd /c " + commandFile };
+        }
+        else {
+          commandArray = new String[]{ "cmd", "/c", commandFile};
+        }
+      } else {
+        commandArray = new String[] { "bash", "-c", commandFile };
+      }
+      shExec = new ShellCommandExecutor(commandArray, currentWorkDirectory);
       shExec.execute();
     } catch (Exception e) {
       if (shExec == null) {
@@ -205,11 +215,11 @@ public class DefaultTaskController exten
   }
 
   @Override
-  public void signalTask(String user, int taskPid, Signal signal) {
+  public void signalTask(String user, String taskPid, Signal signal) {
     if (ProcessTree.isSetsidAvailable) {
-      ProcessTree.killProcessGroup(Integer.toString(taskPid), signal);
+      ProcessTree.killProcessGroup(taskPid, signal);
     } else {
-      ProcessTree.killProcess(Integer.toString(taskPid), signal);      
+      ProcessTree.killProcess(taskPid, signal);      
     }
   }
 

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobConf.java?rev=1350835&r1=1350834&r2=1350835&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobConf.java Sat
Jun 16 00:33:15 2012
@@ -165,6 +165,8 @@ public class JobConf extends Configurati
   static final String MR_ACLS_ENABLED = "mapred.acls.enabled";
 
   static final String MR_ADMINS = "mapreduce.cluster.administrators";
+  
+  static final String MAPRED_PROCESSGROUP_ENABLED = "mapred.processgroup.enabled";
 
   /**
    * Configuration key to set the java command line options for the child
@@ -474,6 +476,22 @@ public class JobConf extends Configurati
   }
 
 
+  /**
+   * Get if Jobobject should be used for tasks on Windows
+   * 
+   * @return use Windows Jobobject or not
+   */
+  public boolean processGroupEnabled() {
+    return getBoolean(MAPRED_PROCESSGROUP_ENABLED, true);
+  }
+
+  /**
+   * Set whether to use Windows Jobobjects for tasks 
+   * @param value <code>true</code> to use Windows Jobobjects
+   */
+  public void setProcessGroupEnabled(boolean value) {
+    setBoolean(MAPRED_PROCESSGROUP_ENABLED, value);
+  }
   
   /**
    * Set whether the framework should keep the intermediate files for 

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=1350835&r1=1350834&r2=1350835&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JvmManager.java
(original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JvmManager.java
Sat Jun 16 00:33:15 2012
@@ -516,10 +516,10 @@ class JvmManager {
 
       private class DelayedProcessKiller extends Thread {
         private final String user;
-        private final int pid;
+        private final String pid;
         private final long delay;
         private final Signal signal;
-        DelayedProcessKiller(String user, int pid, long delay, Signal signal) {
+        DelayedProcessKiller(String user, String pid, long delay, Signal signal) {
           this.user = user;
           this.pid = pid;
           this.delay = delay;
@@ -548,14 +548,13 @@ class JvmManager {
           String pidStr = jvmIdToPid.get(jvmId);
           if (pidStr != null && !pidStr.equals("")){ 
             String user = env.conf.getUser();
-            int pid = Integer.parseInt(pidStr);
             // start a thread that will kill the process dead
             if (sleeptimeBeforeSigkill > 0) {
-              new DelayedProcessKiller(user, pid, sleeptimeBeforeSigkill, 
+              new DelayedProcessKiller(user, pidStr, sleeptimeBeforeSigkill, 
                                        Signal.KILL).start();
-              controller.signalTask(user, pid, Signal.TERM);
+              controller.signalTask(user, pidStr, Signal.TERM);
             } else {
-              controller.signalTask(user, pid, Signal.KILL);
+              controller.signalTask(user, pidStr, Signal.KILL);
             }
           } else {
             LOG.info(String.format("JVM Not killed %s but just removed", jvmId

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1350835&r1=1350834&r2=1350835&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
(original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
Sat Jun 16 00:33:15 2012
@@ -296,14 +296,14 @@ class LinuxTaskController extends TaskCo
   }
 
   @Override
-  public void signalTask(String user, int taskPid, 
+  public void signalTask(String user, String taskPid, 
                          Signal signal) throws IOException {
     String[] command = 
       new String[]{taskControllerExe, 
                    user,
                    localStorage.getDirsString(),
                    Integer.toString(Commands.SIGNAL_TASK.getValue()),
-                   Integer.toString(taskPid),
+                   taskPid,
                    Integer.toString(signal.getValue())};
     ShellCommandExecutor shExec = new ShellCommandExecutor(command);
     if (LOG.isDebugEnabled()) {

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskController.java?rev=1350835&r1=1350834&r2=1350835&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskController.java
(original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskController.java
Sat Jun 16 00:33:15 2012
@@ -132,7 +132,7 @@ public abstract class TaskController imp
    * @param taskPid the pid of the task
    * @param signal the id of the signal to send
    */
-  public abstract void signalTask(String user, int taskPid, 
+  public abstract void signalTask(String user, String taskPid, 
                                   Signal signal) throws IOException;
   
   /**

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1350835&r1=1350834&r2=1350835&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
(original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Sat Jun 16 00:33:15 2012
@@ -94,6 +94,7 @@ import org.apache.hadoop.security.author
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.MemoryCalculatorPlugin;
+import org.apache.hadoop.util.ProcessTree;
 import org.apache.hadoop.util.ResourceCalculatorPlugin;
 import org.apache.hadoop.util.ProcfsBasedProcessTree;
 import org.apache.hadoop.security.token.Token;
@@ -697,6 +698,11 @@ public class TaskTracker implements MRCo
     LOG.info("Starting tasktracker with owner as "
         + getMROwner().getShortUserName());
 
+    if(!fConf.processGroupEnabled()) {
+      // override system determined usage of process groups
+      ProcessTree.disableProcessGroups();
+    }
+    
     localFs = FileSystem.getLocal(fConf);
     if (fConf.get("slave.host.name") != null) {
       this.localHostname = fConf.get("slave.host.name");

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java?rev=1350835&r1=1350834&r2=1350835&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java
(original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java
Sat Jun 16 00:33:15 2012
@@ -143,8 +143,14 @@ public class TestKillSubProcesses extend
   private static RunningJob runJobAndSetProcessHandle(JobTracker jt, JobConf conf)
                      throws IOException {
     RunningJob job = runJob(conf);
+    long startTime = System.currentTimeMillis();
     while (job.getJobState() != JobStatus.RUNNING) {
       try {
+        // without this assert junit eventually times out without showing 
+        // any logs in the test output file and that makes debugging hard
+        if(System.currentTimeMillis() - startTime > 60000) {
+          assertTrue("job did not change to running state", false);
+        }
         Thread.sleep(100);
       } catch (InterruptedException e) {
         break;
@@ -195,33 +201,50 @@ public class TestKillSubProcesses extend
 
     // Checking if the descendant processes of map task are alive
     if(ProcessTree.isSetsidAvailable) {
-      String childPid = UtilsForTests.getPidFromPidFile(
-                               scriptDirName + "/childPidFile" + 0);
-      while(childPid == null) {
-        LOG.warn(scriptDirName + "/childPidFile" + 0 + " is null; Sleeping...");
+      if(Shell.WINDOWS) {
         try {
-          Thread.sleep(500);
+          Thread.sleep(1000);
+          String result = Shell.execCommand("cmd", "/c", Shell.WINUTILS
+              + " task isAlive " + pid);
+          assertTrue("Map process tree not alive", result.contains("IsAlive"));
+          String[] parts = result.split("[,\r\n]");
+          assertTrue(parts.length >= 2);
+          // > numLevelsOfSubProcesses because of winutils etc are also part of 
+          // the task job object along with the spawned scripts
+          assertTrue(Integer.parseInt(parts[1]) > numLevelsOfSubProcesses);
         } catch (InterruptedException ie) {
-          LOG.warn("sleep is interrupted:" + ie);
-          break;
+          // ignore
         }
-        childPid = UtilsForTests.getPidFromPidFile(
-                               scriptDirName + "/childPidFile" + 0);
       }
-
-      // As childPidFile0(leaf process in the subtree of processes with
-      // map task as root) is created, all other child pid files should
-      // have been created already(See the script for details).
-      // Now check if the descendants of map task are alive.
-      for(int i=0; i <= numLevelsOfSubProcesses; i++) {
-        childPid = UtilsForTests.getPidFromPidFile(
-                               scriptDirName + "/childPidFile" + i);
-        LOG.info("pid of the descendant process at level " + i +
-                 "in the subtree of processes(with the map task as the root)" +
-                 " is " + childPid);
-        assertTrue("Unexpected: The subprocess at level " + i +
-                   " in the subtree is not alive before Job completion",
-                   isAlive(childPid));
+      else {
+        String childPid = UtilsForTests.getPidFromPidFile(
+                                 scriptDirName + "/childPidFile" + 0);
+        while(childPid == null) {
+          LOG.warn(scriptDirName + "/childPidFile" + 0 + " is null; Sleeping...");
+          try {
+            Thread.sleep(500);
+          } catch (InterruptedException ie) {
+            LOG.warn("sleep is interrupted:" + ie);
+            break;
+          }
+          childPid = UtilsForTests.getPidFromPidFile(
+                                 scriptDirName + "/childPidFile" + 0);
+        }
+  
+        // As childPidFile0(leaf process in the subtree of processes with
+        // map task as root) is created, all other child pid files should
+        // have been created already(See the script for details).
+        // Now check if the descendants of map task are alive.
+        for(int i=0; i <= numLevelsOfSubProcesses; i++) {
+          childPid = UtilsForTests.getPidFromPidFile(
+                                 scriptDirName + "/childPidFile" + i);
+          LOG.info("pid of the descendant process at level " + i +
+                   "in the subtree of processes(with the map task as the root)" +
+                   " is " + childPid);
+          assertTrue("Unexpected: The subprocess at level " + i +
+                     " in the subtree is not alive before Job completion",
+                     isAlive(childPid));
+        }
       }
     }
     return job;
@@ -233,8 +256,14 @@ public class TestKillSubProcesses extend
   private static void validateKillingSubprocesses(RunningJob job, JobConf conf)
                    throws IOException {
     // wait till the the job finishes
+    long startTime = System.currentTimeMillis();
     while (!job.isComplete()) {
       try {
+        // without this assert junit eventually times out without showing 
+        // any logs in the test output file and that makes debugging hard
+        if(System.currentTimeMillis() - startTime > 60000) {
+          assertTrue("job did not complete", false);
+        }
         Thread.sleep(500);
       } catch (InterruptedException e) {
         break;
@@ -242,20 +271,26 @@ public class TestKillSubProcesses extend
     }
 
     // Checking if the map task got killed or not
-    assertTrue(!ProcessTree.isAlive(pid));
+    assertTrue(!isAlive(pid));
     LOG.info("The map task is not alive after Job is completed, as expected.");
 
     // Checking if the descendant processes of map task are killed properly
     if(ProcessTree.isSetsidAvailable) {
-      for(int i=0; i <= numLevelsOfSubProcesses; i++) {
-        String childPid = UtilsForTests.getPidFromPidFile(
-                               scriptDirName + "/childPidFile" + i);
-        LOG.info("pid of the descendant process at level " + i +
-                 "in the subtree of processes(with the map task as the root)" +
-                 " is " + childPid);
-        assertTrue("Unexpected: The subprocess at level " + i +
-                   " in the subtree is alive after Job completion",
-                   !isAlive(childPid));
+      if(Shell.WINDOWS) {
+        String result = Shell.execCommand("cmd", "/c", Shell.WINUTILS
+            + " task isAlive " + pid);
+        assertTrue("Map process tree not alive", !result.contains("IsAlive"));
+      } else {
+        for(int i=0; i <= numLevelsOfSubProcesses; i++) {
+          String childPid = UtilsForTests.getPidFromPidFile(
+                                 scriptDirName + "/childPidFile" + i);
+          LOG.info("pid of the descendant process at level " + i +
+                   "in the subtree of processes(with the map task as the root)" +
+                   " is " + childPid);
+          assertTrue("Unexpected: The subprocess at level " + i +
+                     " in the subtree is alive after Job completion",
+                     !isAlive(childPid));
+        }
       }
     }
     FileSystem fs = FileSystem.getLocal(mr.createJobConf());
@@ -302,14 +337,7 @@ public class TestKillSubProcesses extend
     return UtilsForTests.runJob(conf, inDir, outDir);
   }
   
-  public void testJobKillFailAndSucceed() throws IOException {
-    if (Shell.DISABLEWINDOWS_TEMPORARILY) {
-      System.out
-          .println("setsid doesn't work on WINDOWS as expected. Not testing."
-              + "There also seem to be issues related to file permissions");
-      return;
-    }
-    
+  public void testJobKillFailAndSucceed() throws IOException {    
     try {
       JobConf conf = new JobConf();
       conf.setLong(JvmManager.JvmManagerForType.DELAY_BEFORE_KILL_KEY, 0L);
@@ -368,26 +396,38 @@ public class TestKillSubProcesses extend
       fs.setPermission(scriptDir, new FsPermission(FsAction.ALL, FsAction.ALL,
           FsAction.ALL));
 
+     String script = null;
+     
      // create shell script
      Random rm = new Random();
       Path scriptPath = new Path(scriptDirName, "_shellScript_" + rm.nextInt()
-        + ".sh");
+        + (Shell.WINDOWS?".cmd":".sh"));
       String shellScript = scriptPath.toString();
 
-      // Construct the script. Set umask to 0000 so that TT can access all the
-      // files.
-      String script =
-        "umask 000\n" + 
-        "echo $$ > " + scriptDirName + "/childPidFile" + "$1\n" +
-        "echo hello\n" +
-        "trap 'echo got SIGTERM' 15 \n" +
-        "if [ $1 != 0 ]\nthen\n" +
-        " sh " + shellScript + " $(($1-1))\n" +
-        "else\n" +
-        " while true\n do\n" +
-        "  sleep 2\n" +
-        " done\n" +
-        "fi";
+     if(Shell.WINDOWS) {
+       script = 
+           "SETLOCAL ENABLEDELAYEDEXPANSION\n" +
+           ":while\n" +
+           "timeout 2 /NOBREAK\n" +
+           "goto :while\n";
+     } else {
+       // Construct the script. Set umask to 0000 so that TT can access all the
+       // files.
+       script =
+         "umask 000\n" + 
+         "echo $$ > " + scriptDirName + "/childPidFile" + "$1\n" +
+         "echo hello\n" +
+         "trap 'echo got SIGTERM' 15 \n" +
+         "if [ $1 != 0 ]\nthen\n" +
+         " sh " + shellScript + " $(($1-1))\n" +
+         "else\n" +
+         " while true\n do\n" +
+         "  sleep 2\n" +
+         " done\n" +
+         "fi";
+     }
+     
+
       DataOutputStream file = fs.create(scriptPath);
       file.writeBytes(script);
       file.close();
@@ -396,21 +436,30 @@ public class TestKillSubProcesses extend
       new File(scriptPath.toUri().getPath()).setExecutable(true);
 
       LOG.info("Calling script from map task : " + shellScript);
-      Runtime.getRuntime()
-          .exec(shellScript + " " + numLevelsOfSubProcesses);
-    
-      String childPid = UtilsForTests.getPidFromPidFile(scriptDirName
-          + "/childPidFile" + 0);
-      while (childPid == null) {
-        LOG.warn(scriptDirName + "/childPidFile" + 0 + " is null; Sleeping...");
-        try {
-          Thread.sleep(500);
-        } catch (InterruptedException ie) {
-          LOG.warn("sleep is interrupted:" + ie);
-          break;
-        }
-        childPid = UtilsForTests.getPidFromPidFile(scriptDirName
+      
+      if(Shell.WINDOWS) {
+        for(int i=0; i<numLevelsOfSubProcesses; ++i) {
+          // recursive batch script is not working inside the JVM on windows
+          // so creating copies of it
+          Runtime.getRuntime().exec(shellScript);
+        }  
+      } else {
+        Runtime.getRuntime()
+        .exec(shellScript + " " + numLevelsOfSubProcesses);        
+
+        String childPid = UtilsForTests.getPidFromPidFile(scriptDirName
             + "/childPidFile" + 0);
+        while (childPid == null) {
+          LOG.warn(scriptDirName + "/childPidFile" + 0 + " is null; Sleeping...");
+          try {
+            Thread.sleep(500);
+          } catch (InterruptedException ie) {
+            LOG.warn("sleep is interrupted:" + ie);
+            break;
+          }
+          childPid = UtilsForTests.getPidFromPidFile(scriptDirName
+              + "/childPidFile" + 0);
+        }
       }
     }
   }
@@ -507,6 +556,14 @@ public class TestKillSubProcesses extend
    * @return if a process is alive or not.
    */
   private static boolean isAlive(String pid) throws IOException {
+    if(Shell.WINDOWS) {
+      if(ProcessTree.isSetsidAvailable) {
+        return ProcessTree.isProcessGroupAlive(pid);
+      }
+      else {
+        return ProcessTree.isAlive(pid);
+      }
+    }
     String commandString ="ps -o pid,command -e";
     String args[] = new String[] {"bash", "-c" , commandString};
     ShellCommandExecutor shExec = new ShellCommandExecutor(args);

Modified: hadoop/common/branches/branch-1-win/src/winutils/common.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/winutils/common.h?rev=1350835&r1=1350834&r2=1350835&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/winutils/common.h (original)
+++ hadoop/common/branches/branch-1-win/src/winutils/common.h Sat Jun 16 00:33:15 2012
@@ -26,6 +26,7 @@
 #include <windows.h>
 #include <aclapi.h>
 #include <accctrl.h>
+#include <tchar.h>
 #include <strsafe.h>
 
 /*
@@ -82,6 +83,9 @@ void GroupsUsage(LPCWSTR program);
 int Hardlink(int argc, wchar_t *argv[]);
 void HardlinkUsage();
 
+int Task(int argc, wchar_t *argv[]);
+void TaskUsage();
+
 DWORD GetFileInformationByName(__in LPCWSTR pathName,
   __out LPBY_HANDLE_FILE_INFORMATION lpFileInformation);
 

Modified: hadoop/common/branches/branch-1-win/src/winutils/main.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/winutils/main.c?rev=1350835&r1=1350834&r2=1350835&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/winutils/main.c (original)
+++ hadoop/common/branches/branch-1-win/src/winutils/main.c Sat Jun 16 00:33:15 2012
@@ -51,6 +51,10 @@ int wmain(int argc, wchar_t* argv[])
   {
     return Hardlink(argc - 1, argv + 1);
   }
+  else if (wcscmp(L"task", cmd) == 0)
+  {
+    return Task(argc - 1, argv + 1);
+  }
   else
   {
     Usage(argv[0]);
@@ -86,4 +90,8 @@ The available commands and their usages 
   fwprintf(stdout, L"%-10s%s\n\n", L"hardlink", L"Hard link operations.");
   HardlinkUsage();
   fwprintf(stdout, L"\n\n");
+
+  fwprintf(stdout, L"%-10s%s\n\n", L"task", L"Task operations.");
+  TaskUsage();
+  fwprintf(stdout, L"\n\n");
 }
\ No newline at end of file

Added: hadoop/common/branches/branch-1-win/src/winutils/task.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/winutils/task.c?rev=1350835&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/winutils/task.c (added)
+++ hadoop/common/branches/branch-1-win/src/winutils/task.c Sat Jun 16 00:33:15 2012
@@ -0,0 +1,343 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with this
+* work for additional information regarding copyright ownership. The ASF
+* licenses this file to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+* 
+* http://www.apache.org/licenses/LICENSE-2.0
+* 
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+* License for the specific language governing permissions and limitations under
+* the License.
+*/
+
+#include "common.h"
+#include <errno.h>
+
+// List of different hardlink related command line options supported by
+// winutils.
+typedef enum TaskCommandOptionType
+{
+  TaskInvalid,
+  TaskCreate,
+  TaskIsAlive,
+  TaskKill
+} TaskCommandOption;
+
+//----------------------------------------------------------------------------
+// Function: ParseCommandLine
+//
+// Description:
+//  Parses the given command line. On success, out param 'command' contains
+//  the user specified command.
+//
+// Returns:
+// TRUE: If the command line is valid
+// FALSE: otherwise
+static BOOL ParseCommandLine(__in int argc,
+                             __in wchar_t *argv[],
+                             __out TaskCommandOption *command)
+{
+  *command = TaskInvalid;
+
+  if (wcscmp(argv[0], L"task") != 0 )
+  {
+    return FALSE;
+  }
+
+  if (argc == 3) {
+    if (wcscmp(argv[1], L"isAlive") == 0)
+    {
+      *command = TaskIsAlive;
+      return TRUE;
+    }
+    if (wcscmp(argv[1], L"kill") == 0)
+    {
+      *command = TaskKill;
+      return TRUE;
+    }
+  }
+
+  if (argc == 4) {
+    if (wcscmp(argv[1], L"create") == 0)
+    {
+      *command = TaskCreate;
+      return TRUE;
+    }
+  }
+
+  return FALSE;
+}
+
+//----------------------------------------------------------------------------
+// Function: createTask
+//
+// Description:
+//  Creates a task via a jobobject. Outputs the
+//  appropriate information to stdout on success, or stderr on failure.
+//
+// Returns:
+// ERROR_SUCCESS: On success
+// GetLastError: otherwise
+DWORD createTask(_TCHAR* jobObjName, _TCHAR* cmdLine) 
+{
+  DWORD err = ERROR_SUCCESS;
+  DWORD exitCode = EXIT_FAILURE;
+  STARTUPINFO si;
+  PROCESS_INFORMATION pi;
+  SECURITY_ATTRIBUTES sa;
+  HANDLE jobObject = NULL;
+
+  // make the job object handle inheritable so that it can be inherited 
+  // via CreateProcess. If CreateProcess is used in this manner, then spawned 
+  // processes will keep the handle to the job object alive and it can be 
+  // manipulated via name by another process (say winutils task kill)
+  sa.nLength = sizeof(SECURITY_ATTRIBUTES);
+  sa.lpSecurityDescriptor = NULL;
+  sa.bInheritHandle = TRUE;
+  jobObject = CreateJobObject(&sa, jobObjName);
+  err = GetLastError();
+  if(jobObject == NULL || err ==  ERROR_ALREADY_EXISTS)
+  {
+    return err;
+  }
+  
+  if(AssignProcessToJobObject(jobObject, GetCurrentProcess()) == 0)
+  {
+    err = GetLastError();
+    CloseHandle(jobObject);
+    return err;
+  }
+
+  // the child JVM uses this env var to send the task OS process identifier 
+  // to the TaskTracker. We pass the job object name.
+  if(SetEnvironmentVariable(_T("JVM_PID"), jobObjName) == 0)
+  {
+    err = GetLastError();
+    CloseHandle(jobObject);
+    return err;
+  }
+
+  ZeroMemory( &si, sizeof(si) );
+  si.cb = sizeof(si);
+  ZeroMemory( &pi, sizeof(pi) );
+  // inherit the job object handle so that it can be manipulated via its name 
+  // by another program (say winutils task kill) even if winutils crashes.
+  if(CreateProcess(NULL, cmdLine, NULL, NULL, TRUE, 0, NULL, NULL, &si, &pi) == 0)
+  {
+    err = GetLastError();
+    CloseHandle(jobObject);
+    return err;
+  }
+  CloseHandle(pi.hThread);
+
+  // Wait until child process exits.
+  WaitForSingleObject( pi.hProcess, INFINITE );
+  if(GetExitCodeProcess(pi.hProcess, &exitCode) == 0)
+  {
+    err = GetLastError();
+  }
+  CloseHandle( pi.hProcess );
+
+  // Terminate job object so that all spawned processes are also killed.
+  // This is needed because once this process closes the handle to the job 
+  // object and none of the spawned objects have the handle open (via 
+  // inheritance on creation) then it will not be possible for any other external 
+  // program (say winutils task kill) to terminate this job object via its name.
+  if(TerminateJobObject(jobObject, exitCode) == 0)
+  {
+    err = GetLastError();
+  }
+
+  // comes here only on failure or TerminateJobObject
+  CloseHandle(jobObject);
+
+  if(err != ERROR_SUCCESS)
+  {
+    return err;
+  }
+  return exitCode;
+}
+
+//----------------------------------------------------------------------------
+// Function: isTaskAlive
+//
+// Description:
+//  Checks if a task is alive via a jobobject. Outputs the
+//  appropriate information to stdout on success, or stderr on failure.
+//
+// Returns:
+// ERROR_SUCCESS: On success
+// GetLastError: otherwise
+DWORD isTaskAlive(const _TCHAR* jobObjName, int* isAlive, int* procsInJob)
+{
+  PJOBOBJECT_BASIC_PROCESS_ID_LIST procList;
+  HANDLE jobObject = NULL;
+  int numProcs = 100;
+
+  *isAlive = FALSE;
+  
+  jobObject = OpenJobObject(JOB_OBJECT_QUERY, FALSE, jobObjName);
+
+  if(jobObject == NULL)
+  {
+    DWORD err = GetLastError();
+    if(err == ERROR_FILE_NOT_FOUND)
+    {
+      // job object does not exist. assume its not alive
+      return ERROR_SUCCESS;
+    }
+    return err;
+  }
+
+  procList = (PJOBOBJECT_BASIC_PROCESS_ID_LIST) LocalAlloc(LPTR, sizeof (JOBOBJECT_BASIC_PROCESS_ID_LIST)
+ numProcs*32);
+  if (!procList)
+  {
+    DWORD err = GetLastError();
+    CloseHandle(jobObject);
+    return err;
+  }
+  if(QueryInformationJobObject(jobObject, JobObjectBasicProcessIdList, procList, sizeof(JOBOBJECT_BASIC_PROCESS_ID_LIST)+numProcs*32,
NULL) == 0)
+  {
+    DWORD err = GetLastError();
+    if(err != ERROR_MORE_DATA) 
+    {
+      CloseHandle(jobObject);
+      LocalFree(procList);
+      return err;
+    }
+  }
+
+  if(procList->NumberOfAssignedProcesses > 0)
+  {
+    *isAlive = TRUE;
+    *procsInJob = procList->NumberOfAssignedProcesses;
+  }
+
+  LocalFree(procList);
+
+  return ERROR_SUCCESS;
+}
+
+//----------------------------------------------------------------------------
+// Function: killTask
+//
+// Description:
+//  Kills a task via a jobobject. Outputs the
+//  appropriate information to stdout on success, or stderr on failure.
+//
+// Returns:
+// ERROR_SUCCESS: On success
+// GetLastError: otherwise
+DWORD killTask(_TCHAR* jobObjName)
+{
+  HANDLE jobObject = OpenJobObject(JOB_OBJECT_TERMINATE, FALSE, jobObjName);
+  if(jobObject == NULL)
+  {
+    DWORD err = GetLastError();
+    if(err == ERROR_FILE_NOT_FOUND)
+    {
+      // job object does not exist. assume its not alive
+      return ERROR_SUCCESS;
+    }
+    return err;
+  }
+
+  if(TerminateJobObject(jobObject, -1) == 0)
+  {
+    return GetLastError();
+  }
+  CloseHandle(jobObject);
+
+  return ERROR_SUCCESS;
+}
+
+//----------------------------------------------------------------------------
+// Function: Task
+//
+// Description:
+//  Manages a task via a jobobject (create/isAlive/kill). Outputs the
+//  appropriate information to stdout on success, or stderr on failure.
+//
+// Returns:
+// ERROR_SUCCESS: On success
+// Error code otherwise: otherwise
+int Task(int argc, wchar_t *argv[])
+{
+  DWORD dwErrorCode = ERROR_SUCCESS;
+  TaskCommandOption command = TaskInvalid;
+
+  if (!ParseCommandLine(argc, argv, &command)) {
+    dwErrorCode = ERROR_INVALID_COMMAND_LINE;
+
+    fwprintf(stderr, L"Incorrect command line arguments.\n\n");
+    TaskUsage();
+    goto TaskExit;
+  }
+
+  if (command == TaskCreate)
+  {
+    // Create the task jobobject
+    //
+    dwErrorCode = createTask(argv[2], argv[3]);
+    if (dwErrorCode != ERROR_SUCCESS)
+    {
+      ReportErrorCode(L"createTask", dwErrorCode);
+      goto TaskExit;
+    }
+  } else if (command == TaskIsAlive)
+  {
+    // Check if task jobobject
+    //
+    int isAlive;
+    int numProcs;
+    dwErrorCode = isTaskAlive(argv[2], &isAlive, &numProcs);
+    if (dwErrorCode != ERROR_SUCCESS)
+    {
+      ReportErrorCode(L"isTaskAlive", dwErrorCode);
+      goto TaskExit;
+    }
+
+    // Output the result
+    if(isAlive == TRUE)
+    {
+      fwprintf(stdout, L"IsAlive,%d\n", numProcs);
+    }
+  } else if (command == TaskKill)
+  {
+    // Check if task jobobject
+    //
+    dwErrorCode = killTask(argv[2]);
+    if (dwErrorCode != ERROR_SUCCESS)
+    {
+      ReportErrorCode(L"killTask", dwErrorCode);
+      goto TaskExit;
+    }
+  } else
+  {
+    // Should not happen
+    //
+    assert(FALSE);
+  }
+
+TaskExit:
+  return dwErrorCode;
+}
+
+void TaskUsage()
+{
+  // Hadoop code checks for this string to determine if
+  // jobobject's are being used.
+  // ProcessTree.isSetsidSupported()
+  fwprintf(stdout, L"\
+    Usage: task create [TASKNAME] [COMMAND_LINE] |\n\
+          task isAlive [TASKNAME] |\n\
+          task kill [TASKNAME]\n\
+    Creates a new task jobobject with taskname\n\
+    Checks if task jobobject is alive\n\
+    Kills task jobobject\n");
+}
\ No newline at end of file

Modified: hadoop/common/branches/branch-1-win/src/winutils/winutils.vcxproj
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/winutils/winutils.vcxproj?rev=1350835&r1=1350834&r2=1350835&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/winutils/winutils.vcxproj (original)
+++ hadoop/common/branches/branch-1-win/src/winutils/winutils.vcxproj Sat Jun 16 00:33:15
2012
@@ -140,6 +140,7 @@
     <ClCompile Include="common.c" />
     <ClCompile Include="groups.c" />
     <ClCompile Include="hardlink.c" />
+    <ClCompile Include="task.c" />
     <ClCompile Include="ls.c" />
     <ClCompile Include="main.c" />
   </ItemGroup>
@@ -149,4 +150,4 @@
   <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
   <ImportGroup Label="ExtensionTargets">
   </ImportGroup>
-</Project>
\ No newline at end of file
+</Project>



Mime
View raw message