tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject git commit: TEZ-214. Make TEZ a client-side only deployment. (hitesh)
Date Fri, 14 Jun 2013 20:07:31 GMT
Updated Branches:
  refs/heads/master ca8bb521a -> 3225553bf


TEZ-214. Make TEZ a client-side only deployment. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/3225553b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/3225553b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/3225553b

Branch: refs/heads/master
Commit: 3225553bfa751f52466a24d40bbf960338063afc
Parents: ca8bb52
Author: Hitesh Shah <hitesh@apache.org>
Authored: Fri Jun 14 13:07:08 2013 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Fri Jun 14 13:07:08 2013 -0700

----------------------------------------------------------------------
 INSTALL.txt                                     |  26 ++--
 .../java/org/apache/tez/client/TezClient.java   | 130 ++++++++++++++---
 .../main/java/org/apache/tez/dag/api/DAG.java   |   7 +
 .../apache/tez/dag/api/TezConfiguration.java    |  15 +-
 .../apache/hadoop/mapred/YarnTezDagChild.java   |   2 +-
 .../apache/tez/mapreduce/hadoop/MRHelpers.java  |  21 ++-
 .../org/apache/tez/mapreduce/YARNRunner.java    | 142 ++-----------------
 7 files changed, 170 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3225553b/INSTALL.txt
----------------------------------------------------------------------
diff --git a/INSTALL.txt b/INSTALL.txt
index e958775..e3082ca 100644
--- a/INSTALL.txt
+++ b/INSTALL.txt
@@ -2,7 +2,7 @@ How to run MR on TEZ
 =======================
 
 Tez provides an ApplicationMaster that can run MR or MRR jobs. There is a
-translation layer implemented ( may have bugs so please file JIRAs if you 
+translation layer implemented ( may have bugs so please file JIRAs if you
 come across any issues ) that allows a user to run an MR job against the TEZ
 DAG ApplicationMaster.
 
@@ -14,20 +14,30 @@ Install/Deploy Instructions
 ===========================
 
 1) Deploy Apache Hadoop using the 3.0.0-SNAPSHOT from trunk.
-2) Copy the tez jars and their dependencies from $TEZ_SRC/tez-dist/target/tez-0.2.0-SNAPSHOT/
-   either into $HADOOP_PREFIX/share/hadoop/common/lib/ or modify HADOOP_CLASSPATH
-   in hadoop-env.sh to point to the location where the TEZ jars and dependencies
-   can be found.
+   - You can also use the 2.1.0 branch. For this, the only change will be to build
+     tez with the -Dhadoop.version set to the correct version matching the hadoop
+     branch being used.
+2) Copy the tez jars and their dependencies into HDFS.
+3) Configure tez-site.xml to set tez.lib.uris to point to the paths in HDFS containing
+   the jars. Please note that the paths are not searched recursively so for <basedir>
+   and <basedir>/lib/, you will need to configure the 2 paths as a comma-separated
list.
 3) Modify mapred-site.xml to change "mapreduce.framework.name" property from its
    default value of "yarn" to "yarn-tez"
-4) Submit a MR job as you normally would using something like:
+4) set HADOOP_CLASSPATH to have the following paths in it:
+   - TEZ_CONF_DIR - location of tez-site.xml
+   - TEZ_JARS and TEZ_JARS/libs - location of the tez jars and dependencies.
+5) Submit a MR job as you normally would using something like:
 
 $HADOOP_PREFIX/bin/hadoop jar hadoop-mapreduce-client-jobclient-3.0.0-SNAPSHOT-tests.jar
sleep -mt 1 -rt 1 -m 1 -r 1
 
 This will use the TEZ DAG ApplicationMaster to run the MR job. This can be
 verified by looking at the AM's logs from the YARN ResourceManager UI.
 
-5) There is a basic example of using an MRR job in the tez-mapreduce-examples.jar. Refer
to WordCountMRRTest.java
+6) There is a basic example of using an MRR job in the tez-mapreduce-examples.jar. Refer
to OrderedWordCount.java
 in the source code. To run this example:
 
-$HADOOP_PREFIX/bin/hadoop jar tez-mapreduce-examples-0.2.0-SNAPSHOT.jar wordcountmrrtest
<in> <out>
+$HADOOP_PREFIX/bin/hadoop jar tez-mapreduce-examples.jar orderedwordcount <input> <output>
+
+This will use the TEZ DAG ApplicationMaster to run the ordered word count job. This job is
similar
+to the word count example except that it also orders all words based on the frequency of
+occurrence.

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3225553b/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
index b3fe4f3..7ef7fba 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -18,7 +18,10 @@
 
 package org.apache.tez.client;
 
+import java.io.File;
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.text.NumberFormat;
 import java.util.ArrayList;
@@ -31,11 +34,12 @@ import java.util.Vector;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.security.Credentials;
@@ -60,9 +64,13 @@ import org.apache.hadoop.yarn.util.Records;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.rpc.DAGClientRPCImpl;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResource;
+import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 
 public class TezClient {
   private static final Log LOG = LogFactory.getLog(TezClient.class);
@@ -167,18 +175,6 @@ public class TezClient {
     vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_SIZE + "=" + 0);
     vargs.add("-Dhadoop.root.logger=" + logLevel + ",CLA");
   }
-
-  private void addTezClasspathToEnv(Configuration conf,
-      Map<String, String> environment) {
-    for (String c : conf.getStrings(
-        TezConfiguration.TEZ_APPLICATION_CLASSPATH,
-        TezConfiguration.DEFAULT_TEZ_APPLICATION_CLASSPATH)) {
-      // TEZ-194 - TezConfiguration.DEFAULT_TEZ_APPLICATION_CLASSPATH references
-      // TEZ_HOME_ENV etc which is not really expanded nor defined as ENV_VARS
-      Apps.addToEnvironment(environment,
-          ApplicationConstants.Environment.CLASSPATH.name(), c.trim());
-    }
-  }
   
   public FileSystem ensureExists(Path stagingArea)
       throws IOException {
@@ -223,6 +219,73 @@ public class TezClient {
     return rsrc;
   }
 
+  private Map<String, LocalResource> setupTezJarsLocalResources()
+      throws IOException {
+    Map<String, LocalResource> tezJarResources =
+        new TreeMap<String, LocalResource>();
+    if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
+      return tezJarResources;
+    }
+
+    // Add tez jars to local resource
+    String[] tezJarUris = conf.getStrings(
+        TezConfiguration.TEZ_LIB_URIS);
+    if (tezJarUris == null
+        || tezJarUris.length == 0) {
+      throw new TezUncheckedException("Invalid configuration of tez jars"
+          + ", " + TezConfiguration.TEZ_LIB_URIS
+          + " is not defined in the configurartion");
+    }
+
+    for (String tezJarUri : tezJarUris) {
+      URI uri;
+      try {
+        uri = new URI(tezJarUri.trim());
+      } catch (URISyntaxException e) {
+        String message = "Invalid URI defined in configuration for"
+            + " location of TEZ jars. providedURI=" + tezJarUri;
+        LOG.error(message);
+        throw new TezUncheckedException(message, e);
+      }
+      if (!uri.isAbsolute()) {
+        String message = "Non-absolute URI defined in configuration for"
+            + " location of TEZ jars. providedURI=" + tezJarUri;
+        LOG.error(message);
+        throw new TezUncheckedException(message);
+      }
+      Path p = new Path(uri);
+      FileSystem pathfs = p.getFileSystem(conf);
+      RemoteIterator<LocatedFileStatus> iter = pathfs.listFiles(p, false);
+      while (iter.hasNext()) {
+        LocatedFileStatus fStatus = iter.next();
+        String rsrcName = fStatus.getPath().getName();
+        // FIXME currently not checking for duplicates due to quirks
+        // in assembly generation
+        if (tezJarResources.containsKey(rsrcName)) {
+          String message = "Duplicate resource found"
+              + ", resourceName=" + rsrcName
+              + ", existingPath=" +
+              tezJarResources.get(rsrcName).getResource().toString()
+              + ", newPath=" + fStatus.getPath();
+          LOG.warn(message);
+          // throw new TezUncheckedException(message);
+        }
+        tezJarResources.put(rsrcName,
+            LocalResource.newInstance(
+                ConverterUtils.getYarnUrlFromPath(fStatus.getPath()),
+                LocalResourceType.FILE,
+                LocalResourceVisibility.PUBLIC,
+                fStatus.getLen(),
+                fStatus.getModificationTime()));
+      }
+    }
+    if (tezJarResources.isEmpty()) {
+      LOG.warn("No tez jars found in configured locations"
+          + ". Ignoring for now. Errors may occur");
+    }
+    return tezJarResources;
+  }
+
   private ApplicationSubmissionContext createApplicationSubmissionContext(
       ApplicationId appId, DAG dag, Path appStagingDir, Credentials ts,
       String amQueueName, String amName, List<String> amArgs,
@@ -262,9 +325,9 @@ public class TezClient {
 
     vargs.add(TezConfiguration.DAG_APPLICATION_MASTER_CLASS);
     vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
-        Path.SEPARATOR + ApplicationConstants.STDOUT);
+        File.separator + ApplicationConstants.STDOUT);
     vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
-        Path.SEPARATOR + ApplicationConstants.STDERR);
+        File.separator + ApplicationConstants.STDERR);
 
 
     Vector<String> vargsFinal = new Vector<String>(8);
@@ -281,8 +344,32 @@ public class TezClient {
     // Setup the CLASSPATH in environment
     // i.e. add { Hadoop jars, job jar, CWD } to classpath.
     Map<String, String> environment = new HashMap<String, String>();
-    addTezClasspathToEnv(conf, environment);
-    
+
+    boolean isMiniCluster =
+        conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false);
+    if (isMiniCluster) {
+      Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
+          System.getProperty("java.class.path"));
+    }
+
+    Apps.addToEnvironment(environment,
+        Environment.CLASSPATH.name(),
+        Environment.PWD.$());
+
+    // Add YARN/COMMON/HDFS jars to path
+    if (!isMiniCluster) {
+      for (String c : conf.getStrings(
+          YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+          YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+        Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
+            c.trim());
+      }
+    }
+
+    Apps.addToEnvironment(environment,
+        Environment.CLASSPATH.name(),
+        Environment.PWD.$() + File.separator + "*");
+
     for (Map.Entry<String, String> entry : amEnv.entrySet()) {
       Apps.addToEnvironment(environment, entry.getKey(), entry.getValue());
     }
@@ -291,6 +378,14 @@ public class TezClient {
         new TreeMap<String, LocalResource>();
 
     localResources.putAll(amLocalResources);
+    Map<String, LocalResource> tezJarResources =
+        setupTezJarsLocalResources();
+    localResources.putAll(tezJarResources);
+
+    // Add tez jars to vertices too
+    for (Vertex v : dag.getVertices()) {
+      v.getTaskLocalResources().putAll(tezJarResources);
+    }
     
     // emit protobuf DAG file style
     Path binaryPath =  new Path(appStagingDir,
@@ -298,6 +393,7 @@ public class TezClient {
     dag.addConfiguration(TezConfiguration.DAG_AM_PLAN_REMOTE_PATH,
         binaryPath.toUri().toString());
     DAGPlan dagPB = dag.createDag();
+
     FSDataOutputStream dagPBOutBinaryStream = null;
     
     try {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3225553b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 9369c8f..50b8fc7 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -19,12 +19,14 @@ package org.apache.tez.dag.api;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Stack;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -60,6 +62,11 @@ public class DAG { // FIXME rename to Topology
     vertices.add(vertex);
     return this;
   }
+
+  @Private
+  public synchronized List<Vertex> getVertices() {
+    return Collections.unmodifiableList(this.vertices);
+  }
   
   public synchronized DAG addEdge(Edge edge) {
     // Sanity checks

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3225553b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 4bf156a..945558f 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -113,17 +113,6 @@ public class TezConfiguration extends Configuration {
   public static final float 
           SLOWSTART_VERTEX_SCHEDULER_MAX_SRC_FRACTION_DEFAULT = 0.8f;
 
-  private static final String TEZ_CONF_DIR_ENV = "TEZ_CONF_DIR";
-  private static final String TEZ_HOME_ENV = "TEZ_HOME";
-
-  public static final String TEZ_APPLICATION_CLASSPATH = TEZ_PREFIX
-      + "application.classpath";
-  public static final String[] DEFAULT_TEZ_APPLICATION_CLASSPATH = {
-    TEZ_CONF_DIR_ENV,
-    TEZ_HOME_ENV + "/*",
-    TEZ_HOME_ENV + "/lib/*"
-  };
-
   /**
    * The complete path to the serialized dag plan file
    * <code>DAG_AM_PLAN_PB_BINARY</code>. Used to make the plan available to
@@ -135,4 +124,8 @@ public class TezConfiguration extends Configuration {
 
   public static final String DAG_AM_PLAN_PB_BINARY = "tez-dag.pb";
   public static final String DAG_AM_PLAN_PB_TEXT = "tez-dag.pb.txt";
+
+  public static final String TEZ_LIB_URIS =
+      TEZ_PREFIX + "lib.uris";
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3225553b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index dc5968b..2eec34a 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -264,7 +264,7 @@ public class YarnTezDagChild {
       Credentials cxredentials, Token<JobTokenIdentifier> jobToken,
       int appAttemptId) throws IOException, InterruptedException {
 
-    Configuration conf = new Configuration(false);
+    Configuration conf = new Configuration();
     // set tcp nodelay
     conf.setBoolean("ipc.client.tcpnodelay", true);
     conf.setInt(TezJobConfig.APPLICATION_ATTEMPT_ID, appAttemptId);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3225553b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index 4a9666c..475b3a0 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.mapreduce.hadoop;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
@@ -542,7 +543,9 @@ public class MRHelpers {
   public static void updateEnvironmentForMRTasks(Configuration conf,
       Map<String, String> environment, boolean isMap) {
 
-    if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
+    boolean isMiniCluster =
+        conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false);
+    if (isMiniCluster) {
       Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
           System.getProperty("java.class.path"));
     }
@@ -554,13 +557,19 @@ public class MRHelpers {
         Environment.PWD.$());
 
     // Add YARN/COMMON/HDFS jars to path
-    for (String c : conf.getStrings(
-        YarnConfiguration.YARN_APPLICATION_CLASSPATH,
-        YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
-      Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
-          c.trim());
+    if (!isMiniCluster) {
+      for (String c : conf.getStrings(
+          YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+          YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+        Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
+            c.trim());
+      }
     }
 
+    Apps.addToEnvironment(environment,
+        Environment.CLASSPATH.name(),
+        Environment.PWD.$() + File.separator + "*");
+
     // Shell
     environment.put(Environment.SHELL.name(), conf.get(
         MRJobConfig.MAPRED_ADMIN_USER_SHELL, MRJobConfig.DEFAULT_SHELL));

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3225553b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
index ae53fb2..aa36842 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
@@ -26,7 +26,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.TreeMap;
 import java.util.Vector;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -74,7 +73,6 @@ import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -84,7 +82,6 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.tez.dag.api.DAG;
@@ -120,10 +117,6 @@ public class YARNRunner implements ClientProtocol {
   private Configuration conf;
   private final FileContext defaultFileContext;
 
-  private static final Object classpathLock = new Object();
-  private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
-  private static String initialClasspath = null;
-
   final public static FsPermission DAG_FILE_PERMISSION =
       FsPermission.createImmutable((short) 0644);
   final public static int UTF8_CHUNK_SIZE = 16 * 1024;
@@ -345,48 +338,6 @@ public class YARNRunner implements ClientProtocol {
     return locationHints;
   }
 
-  private static String getInitialClasspath(Configuration conf)
-      throws IOException {
-    synchronized (classpathLock) {
-      if (initialClasspathFlag.get()) {
-        return initialClasspath;
-      }
-      Map<String, String> env = new HashMap<String, String>();
-      MRApps.setClasspath(env, conf);
-      initialClasspath = env.get(Environment.CLASSPATH.name());
-      initialClasspathFlag.set(true);
-      return initialClasspath;
-    }
-  }
-
-  private void setupCommonChildEnv(Configuration conf,
-      Map<String, String> environment) throws IOException {
-
-      Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
-          getInitialClasspath(conf));
-
-    // Shell
-    environment.put(Environment.SHELL.name(), conf.get(
-        MRJobConfig.MAPRED_ADMIN_USER_SHELL, MRJobConfig.DEFAULT_SHELL));
-
-    // Add pwd to LD_LIBRARY_PATH, add this before adding anything else
-    Apps.addToEnvironment(environment, Environment.LD_LIBRARY_PATH.name(),
-        Environment.PWD.$());
-
-    // Add the env variables passed by the admin
-    Apps.setEnvFromInputString(environment, conf.get(
-        MRJobConfig.MAPRED_ADMIN_USER_ENV,
-        MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV));
-
-  }
-
-  private static String getChildEnv(Configuration jobConf, boolean isMap) {
-    if (isMap) {
-      return jobConf.get(MRJobConfig.MAP_ENV, "");
-    }
-    return jobConf.get(MRJobConfig.REDUCE_ENV, "");
-  }
-
   private static String getChildLogLevel(Configuration conf, boolean isMap) {
     if (isMap) {
       return conf.get(
@@ -407,76 +358,29 @@ public class YARNRunner implements ClientProtocol {
 
     if (isMap) {
       warnForJavaLibPath(
-          conf.get(MRJobConfig.MAP_JAVA_OPTS,""),
+          jobConf.get(MRJobConfig.MAP_JAVA_OPTS,""),
           "map",
           MRJobConfig.MAP_JAVA_OPTS,
           MRJobConfig.MAP_ENV);
       warnForJavaLibPath(
-          conf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,""),
+          jobConf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,""),
           "map",
           MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
           MRJobConfig.MAPRED_ADMIN_USER_ENV);
     } else {
       warnForJavaLibPath(
-          conf.get(MRJobConfig.REDUCE_JAVA_OPTS,""),
+          jobConf.get(MRJobConfig.REDUCE_JAVA_OPTS,""),
           "reduce",
           MRJobConfig.REDUCE_JAVA_OPTS,
           MRJobConfig.REDUCE_ENV);
       warnForJavaLibPath(
-          conf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,""),
+          jobConf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,""),
           "reduce",
           MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
           MRJobConfig.MAPRED_ADMIN_USER_ENV);
     }
 
-    setupCommonChildEnv(jobConf, environment);
-
-    // Add the env variables passed by the user
-    String mapredChildEnv = getChildEnv(jobConf, isMap);
-    Apps.setEnvFromInputString(environment, mapredChildEnv);
-
-    // Set logging level in the environment.
-    // This is so that, if the child forks another "bin/hadoop" (common in
-    // streaming) it will have the correct loglevel.
-    environment.put(
-        "HADOOP_ROOT_LOGGER",
-        getChildLogLevel(jobConf, isMap) + ",CLA");
-
-    // FIXME: don't think this is also needed given we already set java
-    // properties.
-    // TODO Change this not to use JobConf.
-    String log4jCmdLineProperties = getLog4jCmdLineProperties(jobConf, isMap);
-    StringBuffer buffer = new StringBuffer();
-    if (log4jCmdLineProperties != null && log4jCmdLineProperties != "") {
-      buffer.append(" " + log4jCmdLineProperties);
-    }
-
-    // FIXME supposedly required for streaming, should we remove it and let
-    // YARN set it for all containers?
-    String hadoopClientOpts = System.getenv("HADOOP_CLIENT_OPTS");
-    if (hadoopClientOpts == null) {
-      hadoopClientOpts = "";
-    } else {
-      hadoopClientOpts = hadoopClientOpts + " ";
-    }
-    hadoopClientOpts = hadoopClientOpts + buffer.toString();
-    //environment.put("HADOOP_CLIENT_OPTS", hadoopClientOpts);
-
-    // FIXME for this to work, we need YARN-561 and the task runtime changed
-    // to use YARN-561
-    // TODO TEZ-194 - addTezClasspathToEnv() probably does not work. 
-    addTezClasspathToEnv(conf, environment);
-    Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
-        getInitialClasspath(conf));
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Dumping out env for child, isMap=" + isMap);
-      for (Map.Entry<String, String> entry : environment.entrySet()) {
-        LOG.debug("Child env entry: "
-            + entry.getKey()
-            + "=" + entry.getValue());
-      }
-    }
+    MRHelpers.updateEnvironmentForMRTasks(jobConf, environment, isMap);
   }
 
   private Vertex configureIntermediateReduceStage(FileSystem fs, JobID jobId,
@@ -496,10 +400,7 @@ public class YARNRunner implements ClientProtocol {
     Map<String, String> reduceEnv = new HashMap<String, String>();
     setupMapReduceEnv(conf, reduceEnv, false);
 
-    Resource reduceResource = Resource.newInstance(conf.getInt(
-        MRJobConfig.REDUCE_MEMORY_MB, MRJobConfig.DEFAULT_REDUCE_MEMORY_MB),
-        conf.getInt(MRJobConfig.REDUCE_CPU_VCORES,
-            MRJobConfig.DEFAULT_REDUCE_CPU_VCORES));
+    Resource reduceResource = MRHelpers.getReduceResource(conf);
 
     Map<String, LocalResource> reduceLocalResources = new TreeMap<String, LocalResource>();
     reduceLocalResources.putAll(jobLocalResources);
@@ -574,11 +475,7 @@ public class YARNRunner implements ClientProtocol {
     TaskLocationHint[] inputSplitLocations =
         getMapLocationHintsFromInputSplits(jobId, fs, jobConf, jobSubmitDir);
 
-    Resource mapResource = Resource.newInstance(
-        jobConf.getInt(MRJobConfig.MAP_MEMORY_MB,
-            MRJobConfig.DEFAULT_MAP_MEMORY_MB),
-        jobConf.getInt(MRJobConfig.MAP_CPU_VCORES,
-            MRJobConfig.DEFAULT_MAP_CPU_VCORES));
+    Resource mapResource = MRHelpers.getMapResource(jobConf);
 
     Map<String, LocalResource> mapLocalResources =
         new TreeMap<String, LocalResource>();
@@ -618,11 +515,7 @@ public class YARNRunner implements ClientProtocol {
       Map<String, String> reduceEnv = new HashMap<String, String>();
       setupMapReduceEnv(jobConf, reduceEnv, false);
 
-      Resource reduceResource = Resource.newInstance(
-          jobConf.getInt(MRJobConfig.REDUCE_MEMORY_MB,
-              MRJobConfig.DEFAULT_REDUCE_MEMORY_MB),
-          jobConf.getInt(MRJobConfig.REDUCE_CPU_VCORES,
-              MRJobConfig.DEFAULT_REDUCE_CPU_VCORES));
+      Resource reduceResource = MRHelpers.getReduceResource(jobConf);
 
       Map<String, LocalResource> reduceLocalResources =
           new TreeMap<String, LocalResource>();
@@ -676,16 +569,6 @@ public class YARNRunner implements ClientProtocol {
     return dag;
   }
 
-  private void addTezClasspathToEnv(Configuration conf,
-      Map<String, String> environment) {
-    for (String c : conf.getStrings(
-        TezConfiguration.TEZ_APPLICATION_CLASSPATH,
-        TezConfiguration.DEFAULT_TEZ_APPLICATION_CLASSPATH)) {
-      Apps.addToEnvironment(environment,
-          ApplicationConstants.Environment.CLASSPATH.name(), c.trim());
-    }
-  }
-
   private void setDAGParamsFromMRConf(DAG dag) {
     Configuration mrConf = this.conf;
     Map<String, String> mrParamToDAGParamMap = DeprecatedKeys.getMRToDAGParamMap();
@@ -764,8 +647,6 @@ public class YARNRunner implements ClientProtocol {
     // Setup the CLASSPATH in environment
     // i.e. add { Hadoop jars, job jar, CWD } to classpath.
     Map<String, String> environment = new HashMap<String, String>();
-    Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
-        getInitialClasspath(conf));
 
     // Setup the environment variables for Admin first
     MRApps.setEnvFromInputString(environment,
@@ -784,7 +665,8 @@ public class YARNRunner implements ClientProtocol {
           dag, 
           appStagingDir, 
           ts,
-          jobConf.get(JobContext.QUEUE_NAME, YarnConfiguration.DEFAULT_QUEUE_NAME),
+          jobConf.get(JobContext.QUEUE_NAME,
+              YarnConfiguration.DEFAULT_QUEUE_NAME),
           vargs, 
           environment, 
           jobLocalResources);
@@ -796,8 +678,8 @@ public class YARNRunner implements ClientProtocol {
     return getJobStatus(jobId);
   }
 
-  private LocalResource createApplicationResource(FileContext fs, Path p, LocalResourceType
type)
-      throws IOException {
+  private LocalResource createApplicationResource(FileContext fs, Path p,
+      LocalResourceType type) throws IOException {
     LocalResource rsrc = Records.newRecord(LocalResource.class);
     FileStatus rsrcStat = fs.getFileStatus(p);
     rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs


Mime
View raw message