tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-345. Command line -D parameters no longer respected in the AM (bikas)
Date Wed, 07 Aug 2013 02:17:58 GMT
Updated Branches:
  refs/heads/master 6ce26bfae -> d9235851c


TEZ-345. Command line -D parameters no longer respected in the AM (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/d9235851
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/d9235851
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/d9235851

Branch: refs/heads/master
Commit: d9235851ca8956ef5dab877cd72c0b66100393ea
Parents: 6ce26bf
Author: Bikas Saha <bikas@apache.org>
Authored: Tue Aug 6 19:17:29 2013 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Tue Aug 6 19:17:29 2013 -0700

----------------------------------------------------------------------
 tez-common/pom.xml                              |   4 +
 .../java/org/apache/tez/common/TezUtils.java    |  54 ++++++++++
 .../java/org/apache/tez/client/TezClient.java   | 108 +++++++++++--------
 .../main/java/org/apache/tez/dag/api/DAG.java   |  32 +++---
 .../apache/tez/dag/api/DagTypeConverters.java   |   6 +-
 .../apache/tez/dag/api/TezConfiguration.java    |   5 +-
 tez-dag-api/src/main/proto/DAGApiRecords.proto  |   7 +-
 .../apache/hadoop/mapred/YarnTezDagChild.java   |   2 +
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  15 ++-
 9 files changed, 164 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d9235851/tez-common/pom.xml
----------------------------------------------------------------------
diff --git a/tez-common/pom.xml b/tez-common/pom.xml
index e1ca4c9..87f18d6 100644
--- a/tez-common/pom.xml
+++ b/tez-common/pom.xml
@@ -37,6 +37,10 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-common</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-dag-api</artifactId>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d9235851/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
new file mode 100644
index 0000000..c374754
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.common;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
+import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
+
+public class TezUtils {
+  
+  public static void addUserSpecifiedTezConfiguration(Configuration conf) 
+      throws IOException {
+    FileInputStream confPBBinaryStream = null;
+    ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
+    try {
+      confPBBinaryStream = new FileInputStream(
+          TezConfiguration.TEZ_PB_BINARY_CONF_NAME);
+      confProtoBuilder.mergeFrom(confPBBinaryStream);
+    } finally {
+      if (confPBBinaryStream != null) {
+        confPBBinaryStream.close();
+      }
+    }
+
+    ConfigurationProto confProto = confProtoBuilder.build();
+
+    List<PlanKeyValuePair> kvPairList = confProto.getConfKeyValuesList();
+    if(kvPairList != null && !kvPairList.isEmpty()) {
+      for(PlanKeyValuePair kvPair : kvPairList) {
+        conf.set(kvPair.getKey(), kvPair.getValue());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d9235851/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
index bc277f6..d7e2d33 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -72,7 +72,9 @@ import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.rpc.DAGClientRPCImpl;
+import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
 
 public class TezClient {
   private static final Log LOG = LogFactory.getLog(TezClient.class);
@@ -253,7 +255,7 @@ public class TezClient {
     return fs;
   }
 
-  private LocalResource createApplicationResource(FileSystem fs, Path p,
+  private LocalResource createLocalResource(FileSystem fs, Path p,
       LocalResourceType type) throws IOException {
     LocalResource rsrc = Records.newRecord(LocalResource.class);
     FileStatus rsrcStat = fs.getFileStatus(p);
@@ -333,44 +335,34 @@ public class TezClient {
     return tezJarResources;
   }
 
-  private Configuration createFinalAMConf(TezConfiguration amConf) {
-    if (amConf == null) {
-      return new TezConfiguration();
-    } else {
-
-      Configuration conf = new Configuration(false);
-      conf.setQuietMode(true);
+  private Configuration createFinalTezConfForApp(TezConfiguration amConf) {
+    Configuration conf = new Configuration(false);
+    conf.setQuietMode(true);
 
-      Iterator<Entry<String, String>> tezConfIter = this.conf.iterator();
-      while (tezConfIter.hasNext()) {
-        Entry<String, String> entry = tezConfIter.next();
+    assert amConf != null;
+    Iterator<Entry<String, String>> iter = amConf.iterator();
+    while (iter.hasNext()) {
+      Entry<String, String> entry = iter.next();
+      // Copy all tez config parameters.
+      if (entry.getKey().startsWith(TezConfiguration.TEZ_PREFIX)) {
         conf.set(entry.getKey(), entry.getValue());
-      }
-
-      Iterator<Entry<String, String>> iter = amConf.iterator();
-      while (iter.hasNext()) {
-        Entry<String, String> entry = iter.next();
-        // Copy all tez config parameters.
-        if (entry.getKey().startsWith(TezConfiguration.TEZ_PREFIX)) {
-          conf.set(entry.getKey(), entry.getValue());
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Adding tez dag am parameter: " + entry.getKey()
-                + ", with value: " + entry.getValue());
-          }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Adding tez dag am parameter: " + entry.getKey()
+              + ", with value: " + entry.getValue());
         }
       }
-      return conf;
     }
+    return conf;
   }
 
   private ApplicationSubmissionContext createApplicationSubmissionContext(
       ApplicationId appId, DAG dag, Path appStagingDir, Credentials ts,
       String amQueueName, String amName, List<String> amArgs,
       Map<String, String> amEnv, Map<String, LocalResource> amLocalResources,
-      TezConfiguration amConf) throws IOException, YarnException {
+      TezConfiguration appConf) throws IOException, YarnException {
 
-    if (amConf == null) {
-      amConf = new TezConfiguration();
+    if (appConf == null) {
+      appConf = new TezConfiguration();
     }
 
     FileSystem fs = ensureExists(appStagingDir);
@@ -475,37 +467,65 @@ public class TezClient {
       v.getTaskLocalResources().putAll(tezJarResources);
     }
 
+    // emit conf as PB file
+    Configuration finalTezConf = createFinalTezConfForApp(appConf);
+    Path binaryConfPath =  new Path(appStagingDir,
+        TezConfiguration.TEZ_PB_BINARY_CONF_NAME + "." + appId.toString());
+    FSDataOutputStream amConfPBOutBinaryStream = null;
+    try {
+      ConfigurationProto.Builder confProtoBuilder = 
+          ConfigurationProto.newBuilder();
+      Iterator<Entry<String, String>> iter = finalTezConf.iterator();
+      while (iter.hasNext()) {
+        Entry<String, String> entry = iter.next();
+        PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
+        kvp.setKey(entry.getKey());
+        kvp.setValue(entry.getValue());
+        confProtoBuilder.addConfKeyValues(kvp);
+      }
+      //binary output
+      amConfPBOutBinaryStream = FileSystem.create(fs, binaryConfPath,
+          new FsPermission(TEZ_AM_FILE_PERMISSION));
+      confProtoBuilder.build().writeTo(amConfPBOutBinaryStream);      
+    } finally {
+      if(amConfPBOutBinaryStream != null){
+        amConfPBOutBinaryStream.close();
+      }
+    }
+    LocalResource binaryConfLr = createLocalResource(fs,
+        binaryConfPath, LocalResourceType.FILE);
+    localResources.put(TezConfiguration.TEZ_PB_BINARY_CONF_NAME, binaryConfLr);
+    
+    // Add tez conf to vertices too
+    for (Vertex v : dag.getVertices()) {
+      v.getTaskLocalResources().put(
+          TezConfiguration.TEZ_PB_BINARY_CONF_NAME, binaryConfLr);
+    }
+    
+    DAGPlan dagPB = dag.createDag(null);
     // emit protobuf DAG file style
-    Path binaryPath =  new Path(appStagingDir,
-        TezConfiguration.TEZ_AM_PLAN_PB_BINARY + "." + appId.toString());
-    amConf.set(TezConfiguration.TEZ_AM_PLAN_REMOTE_PATH, binaryPath.toUri()
-        .toString());
-
-    Configuration finalAMConf = createFinalAMConf(amConf);
-
-    DAGPlan dagPB = dag.createDag(finalAMConf);
-
+    Path binaryDAGPath =  new Path(appStagingDir,
+        TezConfiguration.TEZ_PB_PLAN_BINARY_NAME + "." + appId.toString());
     FSDataOutputStream dagPBOutBinaryStream = null;
 
     try {
       //binary output
-      dagPBOutBinaryStream = FileSystem.create(fs, binaryPath,
+      dagPBOutBinaryStream = FileSystem.create(fs, binaryDAGPath,
           new FsPermission(TEZ_AM_FILE_PERMISSION));
       dagPB.writeTo(dagPBOutBinaryStream);
     } finally {
       if(dagPBOutBinaryStream != null){
         dagPBOutBinaryStream.close();
       }
-    }
+    }    
 
-    localResources.put(TezConfiguration.TEZ_AM_PLAN_PB_BINARY,
-        createApplicationResource(fs,
-            binaryPath, LocalResourceType.FILE));
+    localResources.put(TezConfiguration.TEZ_PB_PLAN_BINARY_NAME,
+        createLocalResource(fs, binaryDAGPath, LocalResourceType.FILE));
 
     if (Level.DEBUG.isGreaterOrEqual(Level.toLevel(amLogLevel))) {
       Path textPath = localizeDagPlanAsText(dagPB, fs, appStagingDir, appId);
-      localResources.put(TezConfiguration.TEZ_AM_PLAN_PB_TEXT,
-          createApplicationResource(fs, textPath, LocalResourceType.FILE));
+      localResources.put(TezConfiguration.TEZ_PB_PLAN_TEXT_NAME,
+          createLocalResource(fs, textPath, LocalResourceType.FILE));
     }
 
     Map<ApplicationAccessType, String> acls
@@ -536,7 +556,7 @@ public class TezClient {
   private Path localizeDagPlanAsText(DAGPlan dagPB, FileSystem fs,
       Path appStagingDir, ApplicationId appId) throws IOException {
     Path textPath = new Path(appStagingDir,
-        TezConfiguration.TEZ_AM_PLAN_PB_TEXT + "." + appId.toString());
+        TezConfiguration.TEZ_PB_PLAN_TEXT_NAME + "." + appId.toString());
     FSDataOutputStream dagPBOutTextStream = null;
     try {
       dagPBOutTextStream = FileSystem.create(fs, textPath, new FsPermission(

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d9235851/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
index e027922..576bdff 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -18,7 +18,6 @@
 package org.apache.tez.dag.api;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -34,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.dag.api.EdgeProperty.ConnectionPattern;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
@@ -248,12 +248,12 @@ public class DAG { // FIXME rename to Topology
 
   // create protobuf message describing DAG
   @Private
-  public DAGPlan createDag(Configuration amConf) {
+  public DAGPlan createDag(Configuration dagConf) {
     verify(true);
 
-    DAGPlan.Builder jobBuilder = DAGPlan.newBuilder();
+    DAGPlan.Builder dagBuilder = DAGPlan.newBuilder();
 
-    jobBuilder.setName(this.name);
+    dagBuilder.setName(this.name);
 
     for(Vertex vertex : vertices){
       VertexPlan.Builder vertexBuilder = VertexPlan.newBuilder();
@@ -330,7 +330,7 @@ public class DAG { // FIXME rename to Topology
       }
 
       vertexBuilder.setTaskConfig(taskConfigBuilder);
-      jobBuilder.addVertex(vertexBuilder);
+      dagBuilder.addVertex(vertexBuilder);
     }
 
     for(Edge edge : edges){
@@ -342,18 +342,22 @@ public class DAG { // FIXME rename to Topology
       edgeBuilder.setSourceType(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getSourceType()));
       edgeBuilder.setEdgeSource(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getEdgeSource()));
       edgeBuilder.setEdgeDestination(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getEdgeDestination()));
-      jobBuilder.addEdge(edgeBuilder);
+      dagBuilder.addEdge(edgeBuilder);
     }
 
-    Iterator<Entry<String, String>> iter = amConf.iterator();
-    while (iter.hasNext()) {
-      Entry<String, String> entry = iter.next();
-      PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
-      kvp.setKey(entry.getKey());
-      kvp.setValue(entry.getValue());
-      jobBuilder.addJobSetting(kvp);
+    if(dagConf != null) {
+      Iterator<Entry<String, String>> iter = dagConf.iterator();
+      ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
+      while (iter.hasNext()) {
+        Entry<String, String> entry = iter.next();
+        PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
+        kvp.setKey(entry.getKey());
+        kvp.setValue(entry.getValue());
+        confProtoBuilder.addConfKeyValues(kvp);
+      }
+      dagBuilder.setDagKeyValues(confProtoBuilder);
     }
 
-    return jobBuilder.build();
+    return dagBuilder.build();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d9235851/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index ac6e610..392558b 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.dag.api.EdgeProperty.ConnectionPattern;
 import org.apache.tez.dag.api.EdgeProperty.SourceType;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
 import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeConnectionPattern;
 import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeSourceType;
@@ -213,8 +214,9 @@ public class DagTypeConverters {
     return Resource.newInstance(taskConfig.getMemoryMb(), taskConfig.getVirtualCores());
   }
 
-  public static Map<String, String> createSettingsMapFromDAGPlan(
-      List<PlanKeyValuePair> settingList) {
+  public static Map<String, String> convertConfFromProto(
+      ConfigurationProto confProto) {
+    List<PlanKeyValuePair> settingList = confProto.getConfKeyValuesList();
     Map<String, String> map = new HashMap<String, String>();
     for(PlanKeyValuePair setting: settingList){
       map.put(setting.getKey(), setting.getValue());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d9235851/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index a8f95c4..fc6219c 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -164,8 +164,9 @@ public class TezConfiguration extends Configuration {
       + "container.reuse.non-local-fallback.enabled";
   public static final boolean TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED_DEFAULT =
false;
 
-  public static final String TEZ_AM_PLAN_PB_BINARY = "tez-dag.pb";
-  public static final String TEZ_AM_PLAN_PB_TEXT = "tez-dag.pb.txt";
+  public static final String TEZ_PB_BINARY_CONF_NAME = "tez-conf.pb";
+  public static final String TEZ_PB_PLAN_BINARY_NAME = "tez-dag.pb";
+  public static final String TEZ_PB_PLAN_TEXT_NAME = "tez-dag.pb.txt";
 
   /*
    * Logger properties

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d9235851/tez-dag-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/proto/DAGApiRecords.proto b/tez-dag-api/src/main/proto/DAGApiRecords.proto
index b5fc5b7..85016b0 100644
--- a/tez-dag-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-dag-api/src/main/proto/DAGApiRecords.proto
@@ -117,14 +117,17 @@ message EdgePlan {
   optional TezEntityDescriptorProto edge_destination = 7;
 }
 
+message ConfigurationProto {
+  repeated PlanKeyValuePair confKeyValues = 1;
+}
+
 message DAGPlan {
   required string name = 1;
   repeated VertexPlan vertex = 2;
   repeated EdgePlan edge = 3;
-  repeated PlanKeyValuePair jobSetting = 4;
+  optional ConfigurationProto dagKeyValues = 4;
 }
 
-
 // DAG monitoring messages
 message ProgressProto {
   optional int32 totalTaskCount = 1;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d9235851/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 7bbdaa4..846f2e7 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -54,6 +54,7 @@ import org.apache.tez.common.OutputSpec;
 import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.Limits;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -80,6 +81,7 @@ public class YarnTezDagChild {
     }
 
     final Configuration defaultConf = new Configuration();
+    TezUtils.addUserSpecifiedTezConfiguration(defaultConf);
     // Security settings will be loaded based on core-site and core-default. Don't
     // depend on the jobConf for this.
     UserGroupInformation.setConfiguration(defaultConf);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d9235851/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 006cdbd..58f2e9d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
@@ -974,7 +975,8 @@ public class DAGAppMaster extends AbstractService {
       long appSubmitTime = Long.parseLong(appSubmitTimeStr);
 
       TezConfiguration conf = new TezConfiguration(new YarnConfiguration());
-
+      TezUtils.addUserSpecifiedTezConfiguration(conf);
+      
       String jobUserName = System
           .getenv(ApplicationConstants.Environment.USER.name());
 
@@ -1038,7 +1040,7 @@ public class DAGAppMaster extends AbstractService {
       // Read the protobuf DAG
       DAGPlan.Builder dagPlanBuilder = DAGPlan.newBuilder();
       dagPBBinaryStream = new FileInputStream(
-          TezConfiguration.TEZ_AM_PLAN_PB_BINARY);
+          TezConfiguration.TEZ_PB_PLAN_BINARY_NAME);
       dagPlanBuilder.mergeFrom(dagPBBinaryStream);
 
       dagPlan = dagPlanBuilder.build();
@@ -1051,9 +1053,12 @@ public class DAGAppMaster extends AbstractService {
         }
       }
 
-      Map<String, String> config = DagTypeConverters.createSettingsMapFromDAGPlan(dagPlan.getJobSettingList());
-      for(Entry<String, String> entry : config.entrySet()) {
-        conf.set(entry.getKey(), entry.getValue());
+      Map<String, String> config = DagTypeConverters.
+          convertConfFromProto(dagPlan.getDagKeyValues());
+      if(config != null) {
+        for(Entry<String, String> entry : config.entrySet()) {
+          conf.set(entry.getKey(), entry.getValue());
+        }
       }
 
       // Job name is the same as the app name until we support multiple dags


Mime
View raw message