hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ey...@apache.org
Subject svn commit: r1602529 - in /hadoop/common/branches/branch-1.2: ./ src/c++/task-controller/impl/ src/mapred/org/apache/hadoop/mapred/
Date Fri, 13 Jun 2014 21:36:37 GMT
Author: eyang
Date: Fri Jun 13 21:36:36 2014
New Revision: 1602529

URL: http://svn.apache.org/r1602529
Log:
MAPREDUCE-4490. Fixed LinuxTaskController to re-initialize user log
directory when JVM reuse option is enabled.  (Sam Liu via eyang)


Modified:
    hadoop/common/branches/branch-1.2/CHANGES.txt
    hadoop/common/branches/branch-1.2/src/c++/task-controller/impl/main.c
    hadoop/common/branches/branch-1.2/src/c++/task-controller/impl/task-controller.c
    hadoop/common/branches/branch-1.2/src/c++/task-controller/impl/task-controller.h
    hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java

Modified: hadoop/common/branches/branch-1.2/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.2/CHANGES.txt?rev=1602529&r1=1602528&r2=1602529&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.2/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1.2/CHANGES.txt Fri Jun 13 21:36:36 2014
@@ -12,6 +12,9 @@ Release 1.2.2 - unreleased
 
   BUG FIXES 
 
+    MAPREDUCE-4490. Fixed LinuxTaskController to re-initialize user log
+    directory when JVM reuse option is enabled.  (Sam Liu via eyang)
+
     HADOOP-9863. Backport HADOOP-8686 to support BigEndian on ppc64. 
     (Yu Li via eyang)
 

Modified: hadoop/common/branches/branch-1.2/src/c++/task-controller/impl/main.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.2/src/c%2B%2B/task-controller/impl/main.c?rev=1602529&r1=1602528&r2=1602529&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.2/src/c++/task-controller/impl/main.c (original)
+++ hadoop/common/branches/branch-1.2/src/c++/task-controller/impl/main.c Fri Jun 13 21:36:36
2014
@@ -51,6 +51,8 @@ void display_usage(FILE *stream) {
 	  DELETE_LOG_AS_USER);
   fprintf(stream, "   run command as user:  %2d cmd args\n",
 	  RUN_COMMAND_AS_USER);
+  fprintf(stream, "   initialize task:      %2d user relative-path jobid taskid\n",
+	  INITIALIZE_TASK);
 }
 
 int main(int argc, char **argv) {
@@ -197,6 +199,15 @@ int main(int argc, char **argv) {
   case RUN_COMMAND_AS_USER:
     exit_code = run_command_as_user(user_detail->pw_name, argv + optind);
     break;
+  case INITIALIZE_TASK:
+    if (argc < 6) {
+      fprintf(LOGFILE, "Too few arguments (%d vs 6) for initializing task\n", argc);
+      return INVALID_ARGUMENT_NUMBER;
+    }
+    job_id = argv[optind++];
+    task_id = argv[optind++];
+    exit_code = initialize_task(user_detail->pw_name, good_local_dirs, job_id, task_id);
+    break;
   default:
     exit_code = INVALID_COMMAND_PROVIDED;
   }

Modified: hadoop/common/branches/branch-1.2/src/c++/task-controller/impl/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.2/src/c%2B%2B/task-controller/impl/task-controller.c?rev=1602529&r1=1602528&r2=1602529&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.2/src/c++/task-controller/impl/task-controller.c (original)
+++ hadoop/common/branches/branch-1.2/src/c++/task-controller/impl/task-controller.c Fri Jun
13 21:36:36 2014
@@ -507,6 +507,13 @@ int create_attempt_directories(const cha
   return result;
 }
 
+int initialize_task(const char* user,
+  const char * good_local_dirs, const char *job_id, const char *task_id) {
+  // Prepare the attempt directories for the task JVM.
+  int result = create_attempt_directories(user, good_local_dirs, job_id, task_id);
+  return result;
+}
+
 /**
  * Load the user information for a given user name.
  */
@@ -883,9 +890,6 @@ int run_task_as_user(const char *user, c
                      const char *work_dir, const char *script_name) {
   int exit_code = -1;
   char *task_script_path = NULL;
-  if (create_attempt_directories(user, good_local_dirs, job_id, task_id) != 0) {
-    goto cleanup;
-  }
   int task_file_source = open_file_as_task_tracker(script_name);
   if (task_file_source == -1) {
     goto cleanup;

Modified: hadoop/common/branches/branch-1.2/src/c++/task-controller/impl/task-controller.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.2/src/c%2B%2B/task-controller/impl/task-controller.h?rev=1602529&r1=1602528&r2=1602529&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.2/src/c++/task-controller/impl/task-controller.h (original)
+++ hadoop/common/branches/branch-1.2/src/c++/task-controller/impl/task-controller.h Fri Jun
13 21:36:36 2014
@@ -26,7 +26,8 @@ enum command {
   SIGNAL_TASK = 2,
   DELETE_AS_USER = 3,
   DELETE_LOG_AS_USER = 4,
-  RUN_COMMAND_AS_USER = 5
+  RUN_COMMAND_AS_USER = 5,
+  INITIALIZE_TASK = 6
 };
 
 enum errorcodes {
@@ -159,3 +160,9 @@ int change_user(uid_t user, gid_t group)
  */
 int create_attempt_directories(const char* user,
 	const char * good_local_dirs, const char *job_id, const char *task_id);
+
+/**
+ * Initialize the task directory
+ */
+int initialize_task(const char* user,
+ 	const char * good_local_dirs, const char *job_id, const char *task_id);

Modified: hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1602529&r1=1602528&r2=1602529&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
(original)
+++ hadoop/common/branches/branch-1.2/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
Fri Jun 13 21:36:36 2014
@@ -23,6 +23,8 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -70,6 +72,9 @@ class LinuxTaskController extends TaskCo
   private static final String TASK_CONTROLLER_EXEC_KEY =
     "mapreduce.tasktracker.task-controller.exe";
   
+  private static Map<String, String> jobUserMap = new HashMap<String, String>();

+  private static File currentWorkDirectory; 
+  
   @Override
   public void setConf(Configuration conf) {
     super.setConf(conf);
@@ -93,7 +98,8 @@ class LinuxTaskController extends TaskCo
     SIGNAL_TASK(2),
     DELETE_AS_USER(3),
     DELETE_LOG_AS_USER(4),
-    RUN_COMMAND_AS_USER(5);
+    RUN_COMMAND_AS_USER(5),
+    INITIALIZE_TASK(6);
 
     private int value;
     Commands(int value) {
@@ -154,6 +160,8 @@ class LinuxTaskController extends TaskCo
                             Path jobConf, TaskUmbilicalProtocol taskTracker,
                             InetSocketAddress ttAddr
                             ) throws IOException {
+    jobUserMap.put(jobid, user);
+
     List<String> command = new ArrayList<String>(
       Arrays.asList(taskControllerExe, 
                     user,
@@ -264,6 +272,18 @@ class LinuxTaskController extends TaskCo
   public void createLogDir(TaskAttemptID taskID,
                            boolean isCleanup) throws IOException {
     // Log dirs are created during attempt dir creation when running the task
+    String[] command = 
+      new String[]{taskControllerExe, 
+          jobUserMap.get(taskID.getJobID().toString()),
+                   localStorage.getDirsString(),
+                   Integer.toString(Commands.INITIALIZE_TASK.getValue()),
+                   taskID.getJobID().toString(),
+                   taskID.toString()};
+    ShellCommandExecutor shExec = new ShellCommandExecutor(command);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("createLogDir: " + Arrays.toString(command));
+    }
+    shExec.execute();
   }
 
   @Override



Mime
View raw message