hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1129593 - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ mr-client/hadoop-mapreduce-clie...
Date Tue, 31 May 2011 09:55:15 GMT
Author: vinodkv
Date: Tue May 31 09:55:15 2011
New Revision: 1129593

URL: http://svn.apache.org/viewvc?rev=1129593&view=rev
Log:
Making pipes work with YARN. Changed pipes to get log-locations from an environmental variable.
(vinodkv)

Modified:
    hadoop/mapreduce/branches/MR-279/CHANGES.txt
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Constants.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java
    hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java

Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1129593&r1=1129592&r2=1129593&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Tue May 31 09:55:15 2011
@@ -4,6 +4,9 @@ Trunk (unreleased changes)
 
   MAPREDUCE-279
 
+    Making pipes work with YARN. Changed pipes to get log-locations from an
+    environmental variable. (vinodkv)
+
     Fixed NPE in CS by checking Application state before scheduling and fixing
     synchronization in CS. (acmurthy)
 

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java?rev=1129593&r1=1129592&r2=1129593&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
(original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
Tue May 31 09:55:15 2011
@@ -130,9 +130,10 @@ public class MapReduceChildJVM {
     // Long term we will need to get it from the Child
     env.put("JVM_PID", "12344");
 
-    env.put(Constants.HADOOP_WORK_DIR, "."); // This should work. TODO: Find
-                                              // why the var is introduced. Not
-                                              // used in tests, for e.g.
+    env.put(Constants.STDOUT_LOGFILE_ENV,
+        getTaskLogFile(containerLogDir, TaskLog.LogName.STDOUT).toString());
+    env.put(Constants.STDERR_LOGFILE_ENV,
+        getTaskLogFile(containerLogDir, TaskLog.LogName.STDERR).toString());
   }
 
   private static String getChildJavaOpts(JobConf jobConf, boolean isMapTask) {

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java?rev=1129593&r1=1129592&r2=1129593&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
(original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java
Tue May 31 09:55:15 2011
@@ -85,10 +85,6 @@ class YarnChild {
     DefaultMetricsSystem.initialize(
         StringUtils.camelize(firstTaskid.getTaskType().name()) +"Task");
 
-    if (null == System.getenv().get(Constants.HADOOP_WORK_DIR)) {
-      throw new IOException("Environment variable " +
-          Constants.HADOOP_WORK_DIR + " is not set");
-    }
     Token<JobTokenIdentifier> jt = loadCredentials(defaultConf, address);
 
     // Create TaskUmbilicalProtocol as actual task owner.

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Constants.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Constants.java?rev=1129593&r1=1129592&r2=1129593&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Constants.java
(original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Constants.java
Tue May 31 09:55:15 2011
@@ -1,7 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.mapred;
 
 public class Constants {
   static final String OUTPUT = "output";
   public static final String HADOOP_WORK_DIR = "HADOOP_WORK_DIR";
   public static final String JOBFILE = "job.xml";
+  public static final String STDOUT_LOGFILE_ENV = "STDOUT_LOGFILE_ENV";
+  public static final String STDERR_LOGFILE_ENV = "STDERR_LOGFILE_ENV";
 }

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java?rev=1129593&r1=1129592&r2=1129593&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java
(original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java
Tue May 31 09:55:15 2011
@@ -42,6 +42,7 @@ import org.apache.hadoop.io.FloatWritabl
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.Constants;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.RecordReader;
@@ -121,16 +122,15 @@ class Application<K1 extends WritableCom
     cmd.add(executable);
     // wrap the command in a stdout/stderr capture
     // we are starting map/reduce task of the pipes job. this is not a cleanup
-    // attempt. 
-    TaskAttemptID taskid = 
-      TaskAttemptID.forName(conf.get(MRJobConfig.TASK_ATTEMPT_ID));
-    File stdout = TaskLog.getTaskLogFile(taskid, false, TaskLog.LogName.STDOUT);
-    File stderr = TaskLog.getTaskLogFile(taskid, false, TaskLog.LogName.STDERR);
+    // attempt.
+    File stdout = new File(System.getenv(Constants.STDOUT_LOGFILE_ENV));
+    File stderr = new File(System.getenv(Constants.STDERR_LOGFILE_ENV));
     long logLength = TaskLog.getTaskLogLength(conf);
     cmd = TaskLog.captureOutAndError(null, cmd, stdout, stderr, logLength,
                                      false);
 
     process = runClient(cmd, env);
+    // TODO: BUG, BUG!! If the process crashes. You are screwed.
     clientSocket = serverSocket.accept();
     
     String challenge = getSecurityChallenge();
@@ -145,7 +145,6 @@ class Application<K1 extends WritableCom
       ReflectionUtils.newInstance(outputValueClass, conf);
     downlink = new BinaryProtocol<K1, V1, K2, V2>(clientSocket, handler, 
                                   outputKey, outputValue, conf);
-    
     downlink.authenticate(digestToSend, challenge);
     waitForAuthentication();
     LOG.debug("Authentication succeeded");

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java?rev=1129593&r1=1129592&r2=1129593&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java
(original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/PipesMapRunner.java
Tue May 31 09:55:15 2011
@@ -20,6 +20,8 @@ package org.apache.hadoop.mapred.pipes;
 
 import java.io.IOException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
@@ -62,6 +64,7 @@ class PipesMapRunner<K1 extends Writable
                   Reporter reporter) throws IOException {
     Application<K1, V1, K2, V2> application = null;
     try {
+
       RecordReader<FloatWritable, NullWritable> fakeInput = 
         (!Submitter.getIsJavaRecordReader(job) && 
          !Submitter.getIsJavaMapper(job)) ? 
@@ -85,7 +88,6 @@ class PipesMapRunner<K1 extends Writable
         V1 value = input.createValue();
         downlink.setInputTypes(key.getClass().getName(),
                                value.getClass().getName());
-        
         while (input.next(key, value)) {
           // map pair to output
           downlink.mapItem(key, value);

Modified: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=1129593&r1=1129592&r2=1129593&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskRunner.java Tue
May 31 09:55:15 2011
@@ -193,7 +193,7 @@ abstract class TaskRunner extends Thread
 
       Map<String, String> env = new HashMap<String, String>();
       errorInfo = getVMEnvironment(errorInfo, workDir, conf, env,
-                                   taskid, logSize);
+                                   taskid, stdout, stderr, logSize);
 
       // flatten the env as a set of export commands
       List <String> setupCmds = new ArrayList<String>();
@@ -504,12 +504,14 @@ abstract class TaskRunner extends Thread
    * @param errorInfo
    * @param workDir
    * @param env
+   * @param stderr 
+   * @param stdout 
    * @return
    * @throws Throwable
    */
   private String getVMEnvironment(String errorInfo, File workDir, JobConf conf,
-      Map<String, String> env, TaskAttemptID taskid, long logSize)
-      throws Throwable {
+      Map<String, String> env, TaskAttemptID taskid, File stdout, File stderr, 
+      long logSize) throws Throwable {
     StringBuffer ldLibraryPath = new StringBuffer();
     ldLibraryPath.append(workDir.toString());
     String oldLdLibraryPath = null;
@@ -520,6 +522,9 @@ abstract class TaskRunner extends Thread
     }
     env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
     env.put(Constants.HADOOP_WORK_DIR, workDir.toString());
+    // Set the stdout and stderr file names. Useful in pipes.
+    env.put(Constants.STDOUT_LOGFILE_ENV, stdout.getAbsolutePath());
+    env.put(Constants.STDERR_LOGFILE_ENV, stderr.getAbsolutePath());
     
     // put jobTokenFile name into env
     String jobTokenFile = conf.get(TokenCache.JOB_TOKENS_FILENAME);

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java?rev=1129593&r1=1129592&r2=1129593&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
(original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
Tue May 31 09:55:15 2011
@@ -27,8 +27,10 @@ import java.io.OutputStream;
 import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.Callable;
 
 import org.apache.commons.logging.Log;
@@ -93,20 +95,31 @@ public class ContainerLaunch implements 
     try {
       // /////////////////////////// Variable expansion
       // Before the container script gets written out.
-      List<String> cmds = container.getLaunchContext().getCommandList();
-      List<String> newCmds = new ArrayList<String>(cmds.size());
-      String containerIdStr = ConverterUtils.toString(container.getContainerID());
+      List<String> newCmds = new ArrayList<String>(command.size());
+      String containerIdStr =
+          ConverterUtils.toString(container.getContainerID());
       String appIdStr = app.toString();
       Path containerLogDir =
           this.logDirsSelector.getLocalPathForWrite(appIdStr + Path.SEPARATOR
               + containerIdStr, LocalDirAllocator.SIZE_UNKNOWN, this.conf,
               false);
-      for (String str : cmds) {
+      for (String str : command) {
         newCmds.add(str.replace("<LOG_DIR>", containerLogDir.toUri()
             .getPath()));
       }
-      container.getLaunchContext().clearCommands();
-      container.getLaunchContext().addAllCommands(newCmds);
+      launchContext.clearCommands();
+      launchContext.addAllCommands(newCmds);
+
+      Map<String, String> envs = launchContext.getAllEnv();
+      Map<String, String> newEnvs = new HashMap<String, String>(envs.size());
+      for (Entry<String, String> entry : envs.entrySet()) {
+        newEnvs.put(
+            entry.getKey(),
+            entry.getValue().replace("<LOG_DIR>",
+                containerLogDir.toUri().getPath()));
+      }
+      launchContext.clearEnv();
+      launchContext.addAllEnv(newEnvs);
       // /////////////////////////// End of variable expansion
 
       FileContext lfs = FileContext.getLocalFSFileContext();
@@ -193,9 +206,11 @@ public class ContainerLaunch implements 
             ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret));
       return ret;
     }
-    LOG.info("Container " + container + " succeeded " + launchContext.getContainerId());
-    dispatcher.getEventHandler().handle(new ContainerEvent(
-          launchContext.getContainerId(), ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));
+    LOG.info("Container " + container + " succeeded "
+        + launchContext.getContainerId());
+    dispatcher.getEventHandler().handle(
+        new ContainerEvent(launchContext.getContainerId(),
+            ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));
     return 0;
   }
 



Mime
View raw message