tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-1469. AM/Session LRs are not shipped to vertices in new API use-case (bikas)
Date Thu, 21 Aug 2014 01:46:24 GMT
Repository: tez
Updated Branches:
  refs/heads/master c5f2eac8d -> db6d9b29e


TEZ-1469. AM/Session LRs are not shipped to vertices in new API use-case (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/db6d9b29
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/db6d9b29
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/db6d9b29

Branch: refs/heads/master
Commit: db6d9b29e07b0337509024cc5ce9bdf9fdc6627b
Parents: c5f2eac
Author: Bikas Saha <bikas@apache.org>
Authored: Wed Aug 20 18:46:10 2014 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Wed Aug 20 18:46:10 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/tez/client/AMConfiguration.java  |  27 +++--
 .../java/org/apache/tez/client/TezClient.java   |  78 ++++++--------
 .../org/apache/tez/client/TezClientUtils.java   | 106 +++++++------------
 .../org/apache/tez/common/TezCommonUtils.java   |  19 +++-
 .../main/java/org/apache/tez/dag/api/DAG.java   |  26 ++++-
 .../tez/dag/api/DataSourceDescriptor.java       |  22 ++--
 .../org/apache/tez/dag/api/TezConstants.java    |   9 +-
 .../java/org/apache/tez/dag/api/Vertex.java     |  17 +--
 .../org/apache/tez/client/TestTezClient.java    |  22 ++--
 .../apache/tez/common/TestTezCommonUtils.java   |   4 +-
 .../org/apache/tez/dag/api/TestDAGPlan.java     |  18 ++--
 .../org/apache/tez/dag/api/TestDAGVerify.java   |  81 +++++++++++++-
 .../java/org/apache/tez/dag/app/AppContext.java |   3 -
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  14 +--
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |   1 -
 .../apache/tez/mapreduce/client/YARNRunner.java |   2 +-
 .../mapreduce/hadoop/TestMRInputHelpers.java    |  10 +-
 .../mapreduce/examples/FilterLinesByWord.java   |   4 +-
 .../examples/FilterLinesByWordOneToOne.java     |   4 +-
 .../tez/mapreduce/examples/MRRSleepJob.java     |   6 +-
 .../examples/TestOrderedWordCount.java          |   8 +-
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java |   2 +-
 23 files changed, 274 insertions(+), 210 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c611668..0deb1ca 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -79,6 +79,7 @@ INCOMPATIBLE CHANGES
   TEZ-1455. Replace deprecated junit.framework.Assert with org.junit.Assert
   TEZ-1465. Update and document IntersectExample. Change name to JoinExample
   TEZ-1449. Change user payloads to work with a byte buffer
+  TEZ-1472. AM/Session LRs are not shipped to vertices in new API use-case
 
 Release 0.4.0-incubating: 2014-04-05
 

http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java b/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
index 9206e72..d8cc2bf 100644
--- a/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/client/AMConfiguration.java
@@ -31,18 +31,18 @@ import com.google.common.collect.Maps;
 @Private
 class AMConfiguration {
 
-  private Map<String, LocalResource> localResources;
+  private Map<String, LocalResource> amLocalResources = Maps.newHashMap();
   private TezConfiguration tezConf;
   private Credentials credentials;
   private YarnConfiguration yarnConfig;
   private Map<String, String> env;
+  private LocalResource binaryConfLRsrc;
 
   AMConfiguration(TezConfiguration tezConf, Map<String, LocalResource> localResources,
       Credentials credentials) {
-    this.localResources = Maps.newHashMap();
     this.tezConf = tezConf;
     if (localResources != null) {
-      addLocalResources(localResources);
+      addAMLocalResources(localResources);
     }
     if (credentials != null) {
       setCredentials(credentials);
@@ -50,12 +50,12 @@ class AMConfiguration {
 
   }
 
-  void addLocalResources(Map<String, LocalResource> localResources) {
-    this.localResources.putAll(localResources);
+  void addAMLocalResources(Map<String, LocalResource> localResources) {
+    this.amLocalResources.putAll(localResources);
   }
   
-  void clearLocalResources() {
-    this.localResources.clear();
+  void clearAMLocalResources() {
+    this.amLocalResources.clear();
   }
   
   void setCredentials(Credentials credentials) {
@@ -74,8 +74,8 @@ class AMConfiguration {
     return this.tezConf.get(TezConfiguration.TEZ_QUEUE_NAME);
   }
 
-  Map<String, LocalResource> getLocalResources() {
-    return localResources;
+  Map<String, LocalResource> getAMLocalResources() {
+    return amLocalResources;
   }
 
   TezConfiguration getTezConfiguration() {
@@ -93,4 +93,13 @@ class AMConfiguration {
   Map<String, String> getEnv() {
     return env;
   }
+  
+  void setBinaryConfLR(LocalResource binaryConfLRsrc) {
+    this.binaryConfLRsrc = binaryConfLRsrc;
+  }
+  
+  LocalResource getBinaryConfLR() {
+    return binaryConfLRsrc;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index 7250379..4b1f221 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.tez.common.TezYARNUtils;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DAGSubmissionTimedOut;
@@ -48,7 +47,6 @@ import org.apache.tez.dag.api.SessionNotRunning;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto;
@@ -159,8 +157,8 @@ public class TezClient {
    *          as app master name is session mode
    * @param tezConf
    *          Configuration for the framework
-   * @param localResources
-   *          resources for the App Master
+   * @param localFiles
+   *          local files for the App Master
    * @param credentials
    *          Set security credentials to be used inside the app master, if
    *          needed. Tez App Master needs credentials to access the staging
@@ -174,9 +172,9 @@ public class TezClient {
    *          calling start()
    */
   public static TezClient create(String name, TezConfiguration tezConf,
-                                 @Nullable Map<String, LocalResource> localResources,
+                                 @Nullable Map<String, LocalResource> localFiles,
                                  @Nullable Credentials credentials) {
-    return new TezClient(name, tezConf, localResources, credentials);
+    return new TezClient(name, tezConf, localFiles, credentials);
   }
 
   /**
@@ -195,58 +193,59 @@ public class TezClient {
   /**
    * Create a new TezClient with AM session mode set explicitly. This overrides
    * the setting from configuration.
-   * Set the initial resources and security credentials for the App Master.
+   * Set the initial files and security credentials for the App Master.
    * @param name
    *          Name of the client. Used for logging etc. This will also be used
    *          as app master name is session mode
    * @param tezConf Configuration for the framework
    * @param isSession The AM will run in session mode or not
-   * @param localResources resources for the App Master
+   * @param localFiles local files for the App Master
    * @param credentials credentials for the App Master
    */
   public static TezClient create(String name, TezConfiguration tezConf, boolean isSession,
-                                 @Nullable Map<String, LocalResource> localResources,
+                                 @Nullable Map<String, LocalResource> localFiles,
                                  @Nullable Credentials credentials) {
-    return new TezClient(name, tezConf, isSession, localResources, credentials);
+    return new TezClient(name, tezConf, isSession, localFiles, credentials);
   }
 
   /**
-   * Add local resources for the DAG App Master. <br>
+   * Add local files for the DAG App Master. These may be files, archives, 
+   * jars etc.<br>
    * <p>
-   * In non-session mode these will be added to the resources of the App Master
-   * to be launched for the next DAG. Resources added via this method will
+   * In non-session mode these will be added to the files of the App Master
+   * to be launched for the next DAG. Files added via this method will
    * accumulate and be used for every new App Master until
-   * clearAppMasterLocalResource() is invoked. <br>
+   * {@link #clearAppMasterLocalFiles()} is invoked. <br>
    * <p>
-   * In session mode, the recommended usage is to add all resources before
-   * calling start() so that all needed resources are available to the app
-   * master before it starts. When called after start(), these local resources
+   * In session mode, the recommended usage is to add all files before
+   * calling start() so that all needed files are available to the app
+   * master before it starts. When called after start(), these local files
    * will be re-localized to the running session DAG App Master and will be
    * added to its classpath for execution of this DAG.
    * <p>
-   * Caveats for invoking this method after start() in session mode: Resources
+   * Caveats for invoking this method after start() in session mode: files
    * accumulate across DAG submissions and are never removed from the classpath.
-   * Only LocalResourceType.FILE is supported. All resources will be treated as
+   * Only LocalResourceType.FILE is supported. All files will be treated as
    * private.
    * 
-   * @param localResources
+   * @param localFiles
    */
-  public synchronized void addAppMasterLocalResources(Map<String, LocalResource> localResources) {
-    Preconditions.checkNotNull(localResources);
+  public synchronized void addAppMasterLocalFiles(Map<String, LocalResource> localFiles) {
+    Preconditions.checkNotNull(localFiles);
     if (isSession && sessionStarted) {
-      additionalLocalResources.putAll(localResources);
+      additionalLocalResources.putAll(localFiles);
     }
-    amConfig.addLocalResources(localResources);
+    amConfig.addAMLocalResources(localFiles);
   }
   
   /**
-   * If the next DAG App Master needs different local resources, then use this
-   * method to clear the local resources and then add the new local resources
-   * using addAppMasterLocalResources(). This method is a no-op in session mode,
+   * If the next DAG App Master needs different local files, then use this
+   * method to clear the local files and then add the new local files
+   * using {@link #addAppMasterLocalFiles(Map)}. This method is a no-op in session mode,
    * after start() is called.
    */
-  public synchronized void clearAppMasterLocalResource() {
-    amConfig.clearLocalResources();
+  public synchronized void clearAppMasterLocalFiles() {
+    amConfig.clearAMLocalResources();
   }
   
   /**
@@ -304,7 +303,7 @@ public class TezClient {
   
         ApplicationSubmissionContext appContext =
             TezClientUtils.createApplicationSubmissionContext(
-                amConfig.getTezConfiguration(), sessionAppId,
+                sessionAppId,
                 null, clientName, amConfig,
                 tezJarResources, sessionCredentials);
   
@@ -365,19 +364,10 @@ public class TezClient {
             + lr.getType() + " is not supported, only " + LocalResourceType.FILE + " is supported");
       }
     }
-
-    // Obtain DAG specific credentials.
-    TezClientUtils.setupDAGCredentials(dag, sessionCredentials, amConfig.getTezConfiguration());
-
-    // TODO TEZ-1229 - fix jar resources
-    // setup env
-    for (Vertex v : dag.getVertices()) {
-      Map<String, String> taskEnv = v.getTaskEnvironment();
-      TezYARNUtils.setupDefaultEnv(taskEnv, amConfig.getTezConfiguration(),
-          TezConfiguration.TEZ_TASK_LAUNCH_ENV, TezConfiguration.TEZ_TASK_LAUNCH_ENV_DEFAULT,
-          TezClientUtils.usingTezLibsFromArchive(getTezJarResources(sessionCredentials)));
-      TezClientUtils.setDefaultLaunchCmdOpts(v, amConfig.getTezConfiguration());
-    }
+    
+    Map<String, LocalResource> tezJarResources = getTezJarResources(sessionCredentials);
+    TezClientUtils.updateDAGVertices(dag, amConfig, tezJarResources,
+        TezClientUtils.usingTezLibsFromArchive(tezJarResources), sessionCredentials);
     
     DAGPlan dagPlan = dag.createDag(amConfig.getTezConfiguration());
     SubmitDAGRequestProto.Builder requestBuilder = SubmitDAGRequestProto.newBuilder();
@@ -684,7 +674,7 @@ public class TezClient {
       // Add credentials for tez-local resources.
       Map<String, LocalResource> tezJarResources = getTezJarResources(credentials);
       ApplicationSubmissionContext appContext = TezClientUtils
-          .createApplicationSubmissionContext(amConfig.getTezConfiguration(), 
+          .createApplicationSubmissionContext( 
               appId, dag, dag.getName(), amConfig, tezJarResources, credentials);
       LOG.info("Submitting DAG to YARN"
           + ", applicationId=" + appId

http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 566a2d8..d931519 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -387,13 +387,13 @@ public class TezClientUtils {
    * @throws YarnException
    */
   static ApplicationSubmissionContext createApplicationSubmissionContext(
-      TezConfiguration conf, ApplicationId appId, DAG dag, String amName,
+      ApplicationId appId, DAG dag, String amName,
       AMConfiguration amConfig, Map<String, LocalResource> tezJarResources,
       Credentials sessionCreds)
           throws IOException, YarnException{
 
     Preconditions.checkNotNull(sessionCreds);
-
+    TezConfiguration conf = amConfig.getTezConfiguration();
     boolean tezLrsAsArchive = usingTezLibsFromArchive(tezJarResources);
 
     FileSystem fs = TezClientUtils.ensureStagingDirExists(conf,
@@ -434,12 +434,6 @@ public class TezClientUtils {
     amLaunchCredentials.writeTokenStorageToStream(dob);
     securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
 
-    // Need to set credentials based on DAG and the URIs which have been set for the DAG.
-
-    if (dag != null) {
-      setupDAGCredentials(dag, sessionCreds, conf);
-    }
-
     // Setup the command to run the AM
     List<String> vargs = new ArrayList<String>(8);
     vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
@@ -497,18 +491,17 @@ public class TezClientUtils {
       }
     }
     
-    Map<String, LocalResource> localResources =
+    Map<String, LocalResource> amLocalResources =
         new TreeMap<String, LocalResource>();
 
     // Not fetching credentials for AMLocalResources. Expect this to be provided via AMCredentials.
-    if (amConfig.getLocalResources() != null) {
-      localResources.putAll(amConfig.getLocalResources());
+    if (amConfig.getAMLocalResources() != null) {
+      amLocalResources.putAll(amConfig.getAMLocalResources());
     }
-    localResources.putAll(tezJarResources);
+    amLocalResources.putAll(tezJarResources);
 
     // emit conf as PB file
-    Configuration finalTezConf = createFinalTezConfForApp(conf,
-      amConfig.getTezConfiguration());
+    Configuration finalTezConf = createFinalTezConfForApp(amConfig.getTezConfiguration());
     
     FSDataOutputStream amConfPBOutBinaryStream = null;
     try {
@@ -535,27 +528,19 @@ public class TezClientUtils {
         TezClientUtils.createLocalResource(fs,
             binaryConfPath, LocalResourceType.FILE,
             LocalResourceVisibility.APPLICATION);
-    localResources.put(TezConstants.TEZ_PB_BINARY_CONF_NAME,
+    amConfig.setBinaryConfLR(binaryConfLRsrc);
+    amLocalResources.put(TezConstants.TEZ_PB_BINARY_CONF_NAME,
         binaryConfLRsrc);
 
     // Create Session Jars definition to be sent to AM as a local resource
-    Path sessionJarsPath = TezCommonUtils.getTezSessionJarStagingPath(tezSysStagingPath);
+    Path sessionJarsPath = TezCommonUtils.getTezAMJarStagingPath(tezSysStagingPath);
     FSDataOutputStream sessionJarsPBOutStream = null;
     try {
-      Map<String, LocalResource> sessionJars =
-        new HashMap<String, LocalResource>(tezJarResources.size() + 1);
-      sessionJars.putAll(tezJarResources);
-      sessionJars.put(TezConstants.TEZ_PB_BINARY_CONF_NAME,
-        binaryConfLRsrc);
-      DAGProtos.PlanLocalResourcesProto proto =
-        DagTypeConverters.convertFromLocalResources(sessionJars);
       sessionJarsPBOutStream = TezCommonUtils.createFileForAM(fs, sessionJarsPath);
-      proto.writeDelimitedTo(sessionJarsPBOutStream);
-      
       // Write out the initial list of resources which will be available in the AM
       DAGProtos.PlanLocalResourcesProto amResourceProto;
-      if (amConfig.getLocalResources() != null && !amConfig.getLocalResources().isEmpty()) {
-        amResourceProto = DagTypeConverters.convertFromLocalResources(localResources);
+      if (amLocalResources != null && !amLocalResources.isEmpty()) {
+        amResourceProto = DagTypeConverters.convertFromLocalResources(amLocalResources);
       } else {
         amResourceProto = DAGProtos.PlanLocalResourcesProto.getDefaultInstance(); 
       }
@@ -570,8 +555,8 @@ public class TezClientUtils {
       TezClientUtils.createLocalResource(fs,
         sessionJarsPath, LocalResourceType.FILE,
         LocalResourceVisibility.APPLICATION);
-    localResources.put(
-      TezConstants.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME,
+    amLocalResources.put(
+      TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME,
       sessionJarsPBLRsrc);
 
     String user = UserGroupInformation.getCurrentUser().getShortUserName();
@@ -579,21 +564,8 @@ public class TezClientUtils {
     Map<ApplicationAccessType, String> acls = aclManager.toYARNACls();
 
     if(dag != null) {
-
-      for (Vertex v : dag.getVertices()) {
-        if (tezJarResources != null) {
-          v.getTaskLocalFiles().putAll(tezJarResources);
-        }
-        v.getTaskLocalFiles().put(TezConstants.TEZ_PB_BINARY_CONF_NAME,
-            binaryConfLRsrc);
-
-        Map<String, String> taskEnv = v.getTaskEnvironment();
-        TezYARNUtils.setupDefaultEnv(taskEnv, conf,
-            TezConfiguration.TEZ_TASK_LAUNCH_ENV,
-            TezConfiguration.TEZ_TASK_LAUNCH_ENV_DEFAULT, tezLrsAsArchive);
-
-        TezClientUtils.setDefaultLaunchCmdOpts(v, amConfig.getTezConfiguration());
-      }
+      
+      updateDAGVertices(dag, amConfig, tezJarResources, tezLrsAsArchive, sessionCreds);
 
       // emit protobuf DAG file style
       Path binaryPath = TezCommonUtils.getTezBinPlanStagingPath(tezSysStagingPath);
@@ -602,8 +574,6 @@ public class TezClientUtils {
             + tezSysStagingPath + " binaryConfPath :" + binaryConfPath + " sessionJarsPath :"
             + sessionJarsPath + " binaryPlanPath :" + binaryPath);
       }
-      amConfig.getTezConfiguration().set(TezConstants.TEZ_AM_PLAN_REMOTE_PATH,
-          binaryPath.toUri().toString());
 
       DAGPlan dagPB = dag.createDag(amConfig.getTezConfiguration());
 
@@ -619,14 +589,14 @@ public class TezClientUtils {
         }
       }
 
-      localResources.put(TezConstants.TEZ_PB_PLAN_BINARY_NAME,
+      amLocalResources.put(TezConstants.TEZ_PB_PLAN_BINARY_NAME,
         TezClientUtils.createLocalResource(fs,
           binaryPath, LocalResourceType.FILE,
           LocalResourceVisibility.APPLICATION));
 
       if (Level.DEBUG.isGreaterOrEqual(Level.toLevel(amLogLevel))) {
         Path textPath = localizeDagPlanAsText(dagPB, fs, amConfig, strAppId, tezSysStagingPath);
-        localResources.put(TezConstants.TEZ_PB_PLAN_TEXT_NAME,
+        amLocalResources.put(TezConstants.TEZ_PB_PLAN_TEXT_NAME,
             TezClientUtils.createLocalResource(fs,
                 textPath, LocalResourceType.FILE,
                 LocalResourceVisibility.APPLICATION));
@@ -635,7 +605,7 @@ public class TezClientUtils {
 
     // Setup ContainerLaunchContext for AM container
     ContainerLaunchContext amContainer =
-        ContainerLaunchContext.newInstance(localResources, environment,
+        ContainerLaunchContext.newInstance(amLocalResources, environment,
             vargsFinal, null, securityTokens, acls);
 
     // Set up the ApplicationSubmissionContext
@@ -662,6 +632,25 @@ public class TezClientUtils {
 
   }
   
+  static void updateDAGVertices(DAG dag, AMConfiguration amConfig,
+      Map<String, LocalResource> tezJarResources, boolean tezLrsAsArchive,
+      Credentials credentials) throws IOException {
+    setupDAGCredentials(dag, credentials, amConfig.getTezConfiguration());
+    for (Vertex v : dag.getVertices()) {
+      if (tezJarResources != null) {
+        v.getTaskLocalFiles().putAll(tezJarResources);
+      }
+      v.getTaskLocalFiles().put(TezConstants.TEZ_PB_BINARY_CONF_NAME,
+          amConfig.getBinaryConfLR());
+
+      Map<String, String> taskEnv = v.getTaskEnvironment();
+      TezYARNUtils.setupDefaultEnv(taskEnv, amConfig.getTezConfiguration(),
+          TezConfiguration.TEZ_TASK_LAUNCH_ENV,
+          TezConfiguration.TEZ_TASK_LAUNCH_ENV_DEFAULT, tezLrsAsArchive);
+
+      setDefaultLaunchCmdOpts(v, amConfig.getTezConfiguration());
+    }
+  }
   
   static void maybeAddDefaultLoggingJavaOpts(String logLevel, List<String> vargs) {
     if (vargs != null && !vargs.isEmpty()) {
@@ -714,29 +703,14 @@ public class TezClientUtils {
         + "," + TezConstants.TEZ_CONTAINER_LOGGER_NAME);
   }
 
-  static Configuration createFinalTezConfForApp(TezConfiguration tezConf,
-      TezConfiguration amConf) {
+  static Configuration createFinalTezConfForApp(TezConfiguration amConf) {
     Configuration conf = new Configuration(false);
     conf.setQuietMode(true);
 
-    assert tezConf != null;
     assert amConf != null;
 
     Entry<String, String> entry;
-    Iterator<Entry<String, String>> iter = tezConf.iterator();
-    while (iter.hasNext()) {
-      entry = iter.next();
-      // Copy all tez config parameters.
-      if (entry.getKey().startsWith(TezConfiguration.TEZ_PREFIX)) {
-        conf.set(entry.getKey(), entry.getValue());
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Adding tez dag am parameter from conf: " + entry.getKey()
-            + ", with value: " + entry.getValue());
-        }
-      }
-    }
-
-    iter = amConf.iterator();
+    Iterator<Entry<String, String>> iter = amConf.iterator();
     while (iter.hasNext()) {
       entry = iter.next();
       // Copy all tez config parameters.

http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
index 15878c3..5148770 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.StringTokenizer;
 import java.util.zip.Deflater;
 import java.util.zip.DeflaterOutputStream;
@@ -38,6 +39,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
@@ -165,8 +167,8 @@ public class TezCommonUtils {
    * @return path to store the session jars
    */
   @Private
-  public static Path getTezSessionJarStagingPath(Path tezSysStagingPath) {
-    return new Path(tezSysStagingPath, TezConstants.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME);
+  public static Path getTezAMJarStagingPath(Path tezSysStagingPath) {
+    return new Path(tezSysStagingPath, TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME);
   }
 
   /**
@@ -295,6 +297,19 @@ public class TezCommonUtils {
   public static FSDataOutputStream createFileForAM(FileSystem fs, Path filePath) throws IOException {
     return FileSystem.create(fs, filePath, new FsPermission(TEZ_AM_FILE_PERMISSION));
   }
+  
+  public static void addAdditionalLocalResources(Map<String, LocalResource> additionalLrs,
+      Map<String, LocalResource> originalLRs) {
+    if (additionalLrs != null && !additionalLrs.isEmpty()) {
+      for (Map.Entry<String, LocalResource> lr : additionalLrs.entrySet()) {
+        if (originalLRs.containsKey(lr.getKey())) {
+          throw new TezUncheckedException("Attempting to add duplicate resource: " + lr.getKey());
+        } else {
+          originalLRs.put(lr.getKey(), lr.getValue());
+        }
+      }
+    }
+  }
 
   @Private
   public static ByteString compressByteArrayToByteString(byte[] inBytes) throws IOException {

http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 5ad724c..fe73c47 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -63,6 +63,7 @@ import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 /**
@@ -85,13 +86,28 @@ public class DAG {
   Set<VertexGroup> vertexGroups = Sets.newHashSet();
   Set<GroupInputEdge> groupInputEdges = Sets.newHashSet();
   private DAGAccessControls dagAccessControls;
-
+  Map<String, LocalResource> commonTaskLocalFiles = Maps.newHashMap();
+  
   private Stack<String> topologicalVertexStack = new Stack<String>();
 
   public DAG(String name) {
     this.name = name;
   }
 
+  /**
+   * Set the files etc that must be provided to the tasks of this DAG
+   * @param localFiles
+   *          files that must be available locally for each task. These files
+   *          may be regular files, archives etc. as specified by the value
+   *          elements of the map.
+   * @return {@link DAG}
+   */
+  public DAG addTaskLocalFiles(Map<String, LocalResource> localFiles) {
+    Preconditions.checkNotNull(localFiles);
+    TezCommonUtils.addAdditionalLocalResources(localFiles, commonTaskLocalFiles);
+    return this;
+  }
+  
   public synchronized DAG addVertex(Vertex vertex) {
     if (vertices.containsKey(vertex.getName())) {
       throw new IllegalStateException(
@@ -597,7 +613,7 @@ public class DAG {
         if (dataSource.getCredentials() != null) {
           credentials.addAll(dataSource.getCredentials());
         }
-        vertex.addAdditionalLocalResources(dataSource.getAdditionalLocalResources());
+        vertex.addTaskLocalFiles(dataSource.getAdditionalLocalFiles());
       }
       if (dataSources.size() == 1) {
         DataSourceDescriptor dataSource = dataSources.get(0);
@@ -614,6 +630,9 @@ public class DAG {
         }
       }
       
+      // add common task files for this DAG
+      vertex.addTaskLocalFiles(commonTaskLocalFiles);
+        
       VertexPlan.Builder vertexBuilder = VertexPlan.newBuilder();
       vertexBuilder.setName(vertex.getName());
       vertexBuilder.setType(PlanVertexType.NORMAL); // vertex type is implicitly NORMAL until  TEZ-46.
@@ -752,7 +771,8 @@ public class DAG {
         confProtoBuilder.addConfKeyValues(kvp);
       }
     }
-    dagBuilder.setDagKeyValues(confProtoBuilder);
+    dagBuilder.setDagKeyValues(confProtoBuilder); // This does not seem to be used anywhere
+    // should this replace BINARY_PB_CONF???
 
     if (credentials != null) {
       dagBuilder.setCredentialsBinary(DagTypeConverters.convertCredentialsToProto(credentials));

http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java
index fc9cf23..f8e6072 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java
@@ -42,7 +42,7 @@ public class DataSourceDescriptor {
   private final Credentials credentials;
   private final int numShards;
   private final VertexLocationHint locationHint;
-  private final Map<String, LocalResource> additionalLocalResources;
+  private final Map<String, LocalResource> additionalLocalFiles;
 
   private DataSourceDescriptor(InputDescriptor inputDescriptor,
                                @Nullable InputInitializerDescriptor initializerDescriptor,
@@ -55,13 +55,13 @@ public class DataSourceDescriptor {
                                int numShards,
                                @Nullable Credentials credentials,
                                @Nullable VertexLocationHint locationHint,
-                               @Nullable Map<String, LocalResource> additionalLocalResources) {
+                               @Nullable Map<String, LocalResource> additionalLocalFiles) {
     this.inputDescriptor = inputDescriptor;
     this.initializerDescriptor = initializerDescriptor;
     this.numShards = numShards;
     this.credentials = credentials;
     this.locationHint = locationHint;
-    this.additionalLocalResources = additionalLocalResources;
+    this.additionalLocalFiles = additionalLocalFiles;
   }
 
   /**
@@ -104,19 +104,19 @@ public class DataSourceDescriptor {
    * @param numShards                Number of shards of data
    * @param credentials              Credentials needed to access the data
    * @param locationHint             Location hints for the vertex tasks
-   * @param additionalLocalResources additional local resources required by this Input. An attempt
-   *                                 will be made to add these resources to the Vertex as Private
+   * @param additionalLocalFiles additional local files required by this Input. An attempt
+   *                                 will be made to add these files to the Vertex as Private
    *                                 resources. If a name conflict occurs, a {@link
-   *                                 org.apache.tez.dag.api.TezException} will be thrown
+   *                                 org.apache.tez.dag.api.TezUncheckedException} will be thrown
    */
   public static DataSourceDescriptor create(InputDescriptor inputDescriptor,
                                             @Nullable InputInitializerDescriptor initializerDescriptor,
                                             int numShards,
                                             @Nullable Credentials credentials,
                                             @Nullable VertexLocationHint locationHint,
-                                            @Nullable Map<String, LocalResource> additionalLocalResources) {
+                                            @Nullable Map<String, LocalResource> additionalLocalFiles) {
     return new DataSourceDescriptor(inputDescriptor, initializerDescriptor, numShards, credentials,
-        locationHint, additionalLocalResources);
+        locationHint, additionalLocalFiles);
   }
 
   public InputDescriptor getInputDescriptor() {
@@ -160,12 +160,12 @@ public class DataSourceDescriptor {
   }
 
   /**
-   * Get the list of additional local resources which were specified during creation.
+   * Get the list of additional local files which were specified during creation.
    * @return
    */
   @InterfaceAudience.Private
-  public @Nullable Map<String, LocalResource> getAdditionalLocalResources() {
-    return additionalLocalResources;
+  public @Nullable Map<String, LocalResource> getAdditionalLocalFiles() {
+    return additionalLocalFiles;
   }
 
 

http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
index ba634af..8af7e84 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
@@ -63,7 +63,7 @@ public class TezConstants {
   public static final String TEZ_CONTAINER_ERR_FILE_NAME = "stderr";
   public static final String TEZ_CONTAINER_OUT_FILE_NAME = "stdout";
 
-  public static final String TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME =
+  public static final String TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME =
     TezConfiguration.TEZ_SESSION_PREFIX + "local-resources.pb";
   
   public static final String TEZ_APPLICATION_TYPE = "TEZ";
@@ -83,13 +83,6 @@ public class TezConstants {
 
   // Configuration keys used internally and not set by the users
   
-  /**
-   * The complete path to the serialized dag plan file
-   * <code>TEZ_AM_PLAN_PB_BINARY</code>. Used to make the plan available to
-   * individual tasks if needed. This will be inside the staging dir
-   */
-  public static final String TEZ_AM_PLAN_REMOTE_PATH = "dag-am-plan.remote.path";
-
   // These are session specific DAG ACL's. Currently here because these can only be specified
   // via code in the API.
   /**

http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
index 5e15e16..6397c82 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.dag.api.VertexGroup.GroupInfo;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.runtime.api.LogicalIOProcessor;
@@ -247,9 +248,9 @@ public class Vertex {
    *          elements of the map.
    * @return this Vertex
    */
-  public Vertex setTaskLocalFiles(Map<String, LocalResource> localFiles) {
+  public Vertex addTaskLocalFiles(Map<String, LocalResource> localFiles) {
     if (localFiles != null) {
-      this.taskLocalResources.putAll(localFiles);
+      TezCommonUtils.addAdditionalLocalResources(localFiles, taskLocalResources);
     }
     return this;
   }
@@ -428,18 +429,6 @@ public class Vertex {
     return Collections.unmodifiableList(outputVertices);
   }
 
-  void addAdditionalLocalResources(Map<String, LocalResource> additionalLrs) {
-    if (additionalLrs != null && !additionalLrs.isEmpty()) {
-      for (Map.Entry<String, LocalResource> lr : additionalLrs.entrySet()) {
-        if (taskLocalResources.containsKey(lr.getKey())) {
-          throw new TezUncheckedException("Attempting to add duplicate resource: " + lr.getKey());
-        } else {
-          taskLocalResources.put(lr.getKey(), lr.getValue());
-        }
-      }
-    }
-  }
-
   /**
    * Set the cpu/memory etc resources used by tasks of this vertex
    * @param resource {@link Resource} for the tasks of this vertex

http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
index ddddb16..e05b004 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -19,6 +19,7 @@
 package org.apache.tez.client;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -134,7 +135,7 @@ public class TestTezClient {
       ApplicationSubmissionContext context = captor.getValue();
       Assert.assertEquals(3, context.getAMContainerSpec().getLocalResources().size());
       Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
-          TezConstants.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME));
+          TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME));
       Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
           TezConstants.TEZ_PB_BINARY_CONF_NAME));
       Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
@@ -143,10 +144,19 @@ public class TestTezClient {
       verify(client.mockYarnClient, times(0)).submitApplication(captor.capture());
     }
     
-    DAG dag = new DAG("DAG").addVertex(
-        Vertex.create("Vertex", ProcessorDescriptor.create("P"), 1, Resource.newInstance(1, 1)));
+    String mockLR1Name = "LR1";
+    Map<String, LocalResource> lrDAG = Collections.singletonMap(mockLR1Name, LocalResource
+        .newInstance(URL.newInstance("file:///", "localhost", 0, "test"), LocalResourceType.FILE,
+            LocalResourceVisibility.PUBLIC, 1, 1));
+    Vertex vertex = Vertex.create("Vertex", ProcessorDescriptor.create("P"), 1,
+        Resource.newInstance(1, 1));
+    DAG dag = new DAG("DAG").addVertex(vertex).addTaskLocalFiles(lrDAG);
     DAGClient dagClient = client.submitDAG(dag);
     
+    // verify that both DAG and TezClient localResources are added to the vertex
+    Map<String, LocalResource> vertexLR = vertex.getTaskLocalFiles();
+    Assert.assertTrue(vertexLR.containsKey(mockLR1Name));
+    
     Assert.assertTrue(dagClient.getExecutionContext().contains(client.mockAppId.toString()));
     
     if (isSession) {
@@ -157,7 +167,7 @@ public class TestTezClient {
       ApplicationSubmissionContext context = captor.getValue();
       Assert.assertEquals(4, context.getAMContainerSpec().getLocalResources().size());
       Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
-          TezConstants.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME));
+          TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME));
       Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
           TezConstants.TEZ_PB_BINARY_CONF_NAME));
       Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
@@ -171,7 +181,7 @@ public class TestTezClient {
     lrs.clear();
     lrs.put(lrName2, LocalResource.newInstance(URL.newInstance("file:///", "localhost", 0, "test"),
         LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
-    client.addAppMasterLocalResources(lrs);
+    client.addAppMasterLocalFiles(lrs);
     
     ApplicationId appId2 = ApplicationId.newInstance(0, 2);
     when(client.mockYarnClient.createApplication().getNewApplicationResponse().getApplicationId())
@@ -199,7 +209,7 @@ public class TestTezClient {
       ApplicationSubmissionContext context = captor.getValue();
       Assert.assertEquals(5, context.getAMContainerSpec().getLocalResources().size());
       Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
-          TezConstants.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME));
+          TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME));
       Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(
           TezConstants.TEZ_PB_BINARY_CONF_NAME));
       Assert.assertTrue(context.getAMContainerSpec().getLocalResources().containsKey(

http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java b/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java
index 27f798d..e0975ab 100644
--- a/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java
+++ b/tez-api/src/test/java/org/apache/tez/common/TestTezCommonUtils.java
@@ -136,10 +136,10 @@ public class TestTezCommonUtils {
   public void testTezSessionJarStagingPath() throws Exception {
     String strAppId = "testAppId";
     Path stageDir = TezCommonUtils.getTezSystemStagingPath(conf, strAppId);
-    Path confStageDir = TezCommonUtils.getTezSessionJarStagingPath(stageDir);
+    Path confStageDir = TezCommonUtils.getTezAMJarStagingPath(stageDir);
     String expectedDir = RESOLVED_STAGE_DIR + File.separatorChar
         + TezCommonUtils.TEZ_SYSTEM_SUB_DIR + File.separatorChar + strAppId + File.separator
-        + TezConstants.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME;
+        + TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME;
     Assert.assertEquals(confStageDir.toString(), expectedDir);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
index 6a74553..fbea24b 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
@@ -108,9 +108,9 @@ public class TestDAGPlan {
     Vertex v1 = Vertex.create("v1", pd1, 10, Resource.newInstance(1024, 1));
     Vertex v2 = Vertex.create("v2", pd2, 1, Resource.newInstance(1024, 1));
     v1.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>())
-        .setTaskLocalFiles(new HashMap<String, LocalResource>());
+        .addTaskLocalFiles(new HashMap<String, LocalResource>());
     v2.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>())
-        .setTaskLocalFiles(new HashMap<String, LocalResource>());
+        .addTaskLocalFiles(new HashMap<String, LocalResource>());
 
     InputDescriptor inputDescriptor = InputDescriptor.create("input")
         .setUserPayload(UserPayload.create(ByteBuffer.wrap("inputBytes".getBytes())));
@@ -144,9 +144,9 @@ public class TestDAGPlan {
     Vertex v1 = Vertex.create("v1", pd1, 10, Resource.newInstance(1024, 1));
     Vertex v2 = Vertex.create("v2", pd2, 1, Resource.newInstance(1024, 1));
     v1.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>())
-        .setTaskLocalFiles(new HashMap<String, LocalResource>());
+        .addTaskLocalFiles(new HashMap<String, LocalResource>());
     v2.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>())
-        .setTaskLocalFiles(new HashMap<String, LocalResource>());
+        .addTaskLocalFiles(new HashMap<String, LocalResource>());
 
     InputDescriptor inputDescriptor = InputDescriptor.create("input").
         setUserPayload(UserPayload.create(ByteBuffer.wrap("inputBytes".getBytes())));
@@ -208,11 +208,11 @@ public class TestDAGPlan {
     Vertex v2 = Vertex.create("v2", pd2, 1, Resource.newInstance(1024, 1));
     Vertex v3 = Vertex.create("v3", pd3, 1, Resource.newInstance(1024, 1));
     v1.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>())
-        .setTaskLocalFiles(new HashMap<String, LocalResource>());
+        .addTaskLocalFiles(new HashMap<String, LocalResource>());
     v2.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>())
-        .setTaskLocalFiles(new HashMap<String, LocalResource>());
+        .addTaskLocalFiles(new HashMap<String, LocalResource>());
     v3.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>())
-        .setTaskLocalFiles(new HashMap<String, LocalResource>());
+        .addTaskLocalFiles(new HashMap<String, LocalResource>());
 
     InputDescriptor inputDescriptor = InputDescriptor.create("input").
         setUserPayload(UserPayload.create(ByteBuffer.wrap("inputBytes".getBytes())));
@@ -278,9 +278,9 @@ public class TestDAGPlan {
     Vertex v1 = Vertex.create("v1", pd1, 10, Resource.newInstance(1024, 1));
     Vertex v2 = Vertex.create("v2", pd2, 1, Resource.newInstance(1024, 1));
     v1.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>())
-        .setTaskLocalFiles(new HashMap<String, LocalResource>());
+        .addTaskLocalFiles(new HashMap<String, LocalResource>());
     v2.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>())
-        .setTaskLocalFiles(new HashMap<String, LocalResource>());
+        .addTaskLocalFiles(new HashMap<String, LocalResource>());
 
     InputDescriptor inputDescriptor = InputDescriptor.create("input").
         setUserPayload(UserPayload.create(ByteBuffer.wrap("inputBytes".getBytes())));

http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index 9885272..033d08b 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -19,10 +19,15 @@
 package org.apache.tez.dag.api;
 
 import java.util.Arrays;
+import java.util.Map;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.tez.common.security.DAGAccessControls;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
@@ -33,6 +38,8 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.google.common.collect.Maps;
+
 public class TestDAGVerify {
 
   private final String dummyProcessorClassName = TestDAGVerify.class.getName();
@@ -896,8 +903,80 @@ public class TestDAGVerify {
 
     dag.createDag(new TezConfiguration());
   }
+  
+  
+  @Test(timeout = 5000)
+  public void testVerifyCommonFiles() {
+    Vertex v1 = Vertex.create("v1",
+        ProcessorDescriptor.create(dummyProcessorClassName),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v2 = Vertex.create("v2",
+        ProcessorDescriptor.create("MapProcessor"),
+        dummyTaskCount, dummyTaskResource);
+    Edge e1 = Edge.create(v1, v2,
+        EdgeProperty.create(DataMovementType.SCATTER_GATHER,
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+            OutputDescriptor.create(dummyOutputClassName),
+            InputDescriptor.create(dummyInputClassName)));
+    Map<String, LocalResource> lrs = Maps.newHashMap();
+    String lrName1 = "LR1";
+    lrs.put(lrName1, LocalResource.newInstance(URL.newInstance("file:///", "localhost", 0, "test"),
+        LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
+    
+    DAG dag = new DAG("testDag");
+    dag.addVertex(v1);
+    dag.addVertex(v2);
+    dag.addEdge(e1);
+    dag.addTaskLocalFiles(lrs);
+    dag.createDag(new TezConfiguration());
+    Assert.assertTrue(v1.getTaskLocalFiles().containsKey(lrName1));
+    Assert.assertTrue(v2.getTaskLocalFiles().containsKey(lrName1));
+  }
 
-
+  @Test(timeout = 5000)
+  public void testVerifyCommonFilesFail() {
+    Vertex v1 = Vertex.create("v1",
+        ProcessorDescriptor.create(dummyProcessorClassName),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v2 = Vertex.create("v2",
+        ProcessorDescriptor.create("MapProcessor"),
+        dummyTaskCount, dummyTaskResource);
+    Edge e1 = Edge.create(v1, v2,
+        EdgeProperty.create(DataMovementType.SCATTER_GATHER,
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+            OutputDescriptor.create(dummyOutputClassName),
+            InputDescriptor.create(dummyInputClassName)));
+    Map<String, LocalResource> lrs = Maps.newHashMap();
+    String lrName1 = "LR1";
+    lrs.put(lrName1, LocalResource.newInstance(URL.newInstance("file:///", "localhost", 0, "test"),
+        LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
+    v1.addTaskLocalFiles(lrs);
+    try {
+      v1.addTaskLocalFiles(lrs);
+      Assert.fail();
+    } catch (TezUncheckedException e) {
+      Assert.assertTrue(e.getMessage().contains("Attempting to add duplicate resource"));
+    }
+    DAG dag = new DAG("testDag");
+    dag.addVertex(v1);
+    dag.addVertex(v2);
+    dag.addEdge(e1);
+    dag.addTaskLocalFiles(lrs);
+    try {
+      dag.addTaskLocalFiles(lrs);
+      Assert.fail();
+    } catch (TezUncheckedException e) {
+      Assert.assertTrue(e.getMessage().contains("Attempting to add duplicate resource"));
+    }
+    try {
+      // dag will add duplicate common files to vertex
+      dag.createDag(new TezConfiguration());
+      Assert.fail();
+    } catch (TezUncheckedException e) {
+      Assert.assertTrue(e.getMessage().contains("Attempting to add duplicate resource"));
+    }
+  }
+  
   @Test(timeout = 5000)
   public void testDAGAccessControls() {
     DAG dag = new DAG("testDag");

http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index 89c468b..9cc28cb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.dag.app.dag.DAG;
@@ -79,8 +78,6 @@ public interface AppContext {
 
   TaskSchedulerEventHandler getTaskScheduler();
 
-  Map<String, LocalResource> getSessionResources();
-
   boolean isSession();
 
   DAGAppMasterState getAMState();

http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index fa4b629..62004d2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -216,8 +216,6 @@ public class DAGAppMaster extends AbstractService {
   private HistoryEventHandler historyEventHandler;
   private final Map<String, LocalResource> amResources = new HashMap<String, LocalResource>();
   private final Map<String, LocalResource> cumulativeAdditionalResources = new HashMap<String, LocalResource>();
-  private final Map<String, LocalResource> sessionResources =
-    new HashMap<String, LocalResource>();
 
   private boolean isLocal = false; //Local mode flag
 
@@ -398,15 +396,10 @@ public class DAGAppMaster extends AbstractService {
       FileInputStream sessionResourcesStream = null;
       try {
         sessionResourcesStream = new FileInputStream(
-          new File(workingDirectory, TezConstants.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME));
-        PlanLocalResourcesProto sessionLocalResourcesProto =
-          PlanLocalResourcesProto.parseDelimitedFrom(sessionResourcesStream);
+          new File(workingDirectory, TezConstants.TEZ_AM_LOCAL_RESOURCES_PB_FILE_NAME));
         PlanLocalResourcesProto amLocalResourceProto = PlanLocalResourcesProto
             .parseDelimitedFrom(sessionResourcesStream);
-        sessionResources.putAll(DagTypeConverters.convertFromPlanLocalResources(
-          sessionLocalResourcesProto));
         amResources.putAll(DagTypeConverters.convertFromPlanLocalResources(amLocalResourceProto));
-        amResources.putAll(sessionResources);
       } finally {
         if (sessionResourcesStream != null) {
           sessionResourcesStream.close();
@@ -1192,11 +1185,6 @@ public class DAGAppMaster extends AbstractService {
     }
 
     @Override
-    public Map<String, LocalResource> getSessionResources() {
-      return sessionResources;
-    }
-
-    @Override
     public boolean isSession() {
       return isSession;
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 6387b8f..1fb40a0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -648,7 +648,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     this.localResources = DagTypeConverters
         .createLocalResourceMapFromDAGPlan(vertexPlan.getTaskConfig()
             .getLocalResourceList());
-    this.localResources.putAll(appContext.getSessionResources());
     this.environment = DagTypeConverters
         .createEnvironmentMapFromDAGPlan(vertexPlan.getTaskConfig()
             .getEnvironmentSettingList());

http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
index 94d5818..a966316 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
@@ -444,7 +444,7 @@ public class YARNRunner implements ClientProtocol {
         : MRHelpers.getJavaOptsForMRReducer(stageConf);
 
     vertex.setTaskEnvironment(taskEnv)
-        .setTaskLocalFiles(taskLocalResources)
+        .addTaskLocalFiles(taskLocalResources)
         .setLocationHint(VertexLocationHint.create(locations))
         .setTaskLaunchCmdOpts(taskJavaOpts);
     

http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java
index 3f9368f..c5f81a9 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRInputHelpers.java
@@ -104,9 +104,9 @@ public class TestMRInputHelpers {
 
     DataSourceDescriptor dataSource = generateDataSourceDescriptorMapReduce(newSplitsDir);
 
-    Assert.assertTrue(dataSource.getAdditionalLocalResources()
+    Assert.assertTrue(dataSource.getAdditionalLocalFiles()
         .containsKey(MRInputHelpers.JOB_SPLIT_RESOURCE_NAME));
-    Assert.assertTrue(dataSource.getAdditionalLocalResources()
+    Assert.assertTrue(dataSource.getAdditionalLocalFiles()
         .containsKey(MRInputHelpers.JOB_SPLIT_METAINFO_RESOURCE_NAME));
 
     RemoteIterator<LocatedFileStatus> files =
@@ -141,8 +141,8 @@ public class TestMRInputHelpers {
   public void testOldSplitsGen() throws Exception {
     DataSourceDescriptor dataSource = generateDataSourceDescriptorMapRed(oldSplitsDir);
     Assert.assertTrue(
-        dataSource.getAdditionalLocalResources().containsKey(MRInputHelpers.JOB_SPLIT_RESOURCE_NAME));
-    Assert.assertTrue(dataSource.getAdditionalLocalResources()
+        dataSource.getAdditionalLocalFiles().containsKey(MRInputHelpers.JOB_SPLIT_RESOURCE_NAME));
+    Assert.assertTrue(dataSource.getAdditionalLocalFiles()
         .containsKey(MRInputHelpers.JOB_SPLIT_METAINFO_RESOURCE_NAME));
 
     RemoteIterator<LocatedFileStatus> files =
@@ -177,7 +177,7 @@ public class TestMRInputHelpers {
   public void testInputSplitLocalResourceCreation() throws Exception {
     DataSourceDescriptor dataSource = generateDataSourceDescriptorMapRed(oldSplitsDir);
 
-    Map<String, LocalResource> localResources = dataSource.getAdditionalLocalResources();
+    Map<String, LocalResource> localResources = dataSource.getAdditionalLocalFiles();
 
     Assert.assertEquals(2, localResources.size());
     Assert.assertTrue(localResources.containsKey(

http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
index 875bfcb..d9304e4 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
@@ -171,7 +171,7 @@ public class FilterLinesByWord extends Configured implements Tool {
     // Setup stage1 Vertex
     Vertex stage1Vertex = Vertex.create("stage1", ProcessorDescriptor.create(
         FilterByWordInputProcessor.class.getName()).setUserPayload(stage1Payload))
-        .setTaskLocalFiles(commonLocalResources);
+        .addTaskLocalFiles(commonLocalResources);
 
     DataSourceDescriptor dsd;
     if (generateSplitsInClient) {
@@ -189,7 +189,7 @@ public class FilterLinesByWord extends Configured implements Tool {
     Vertex stage2Vertex = Vertex.create("stage2", ProcessorDescriptor.create(
         FilterByWordOutputProcessor.class.getName()).setUserPayload(
         TezUtils.createUserPayloadFromConf(stage2Conf)), 1);
-    stage2Vertex.setTaskLocalFiles(commonLocalResources);
+    stage2Vertex.addTaskLocalFiles(commonLocalResources);
 
     // Configure the Output for stage2
     OutputDescriptor od = OutputDescriptor.create(MROutput.class.getName())

http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
index 61d827e..ca2a288 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
@@ -162,7 +162,7 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool {
     // Setup stage1 Vertex
     Vertex stage1Vertex = Vertex.create("stage1", ProcessorDescriptor.create(
         FilterByWordInputProcessor.class.getName()).setUserPayload(stage1Payload))
-        .setTaskLocalFiles(commonLocalResources);
+        .addTaskLocalFiles(commonLocalResources);
 
     DataSourceDescriptor dsd;
     if (generateSplitsInClient) {
@@ -180,7 +180,7 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool {
     Vertex stage2Vertex = Vertex.create("stage2", ProcessorDescriptor.create(
         FilterByWordOutputProcessor.class.getName()).setUserPayload(TezUtils
         .createUserPayloadFromConf(stage2Conf)), dsd.getNumberOfShards());
-    stage2Vertex.setTaskLocalFiles(commonLocalResources);
+    stage2Vertex.addTaskLocalFiles(commonLocalResources);
 
     // Configure the Output for stage2
     stage2Vertex.addDataSink(

http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index e4c515b..38a1045 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -526,7 +526,7 @@ public class MRRSleepJob extends Configured implements Tool {
 
     Vertex mapVertex = Vertex.create("map", ProcessorDescriptor.create(
         MapProcessor.class.getName()).setUserPayload(mapUserPayload), numTasks)
-        .setTaskLocalFiles(commonLocalResources);
+        .addTaskLocalFiles(commonLocalResources);
     mapVertex.addDataSource("MRInput", dataSource);
     vertices.add(mapVertex);
 
@@ -539,7 +539,7 @@ public class MRRSleepJob extends Configured implements Tool {
         Vertex ivertex = Vertex.create("ireduce" + (i + 1),
             ProcessorDescriptor.create(ReduceProcessor.class.getName()).
                 setUserPayload(iReduceUserPayload), numIReducer);
-        ivertex.setTaskLocalFiles(commonLocalResources);
+        ivertex.addTaskLocalFiles(commonLocalResources);
         vertices.add(ivertex);
       }
     }
@@ -549,7 +549,7 @@ public class MRRSleepJob extends Configured implements Tool {
       UserPayload reducePayload = TezUtils.createUserPayloadFromConf(finalReduceConf);
       finalReduceVertex = Vertex.create("reduce", ProcessorDescriptor.create(
           ReduceProcessor.class.getName()).setUserPayload(reducePayload), numReducer);
-      finalReduceVertex.setTaskLocalFiles(commonLocalResources);
+      finalReduceVertex.addTaskLocalFiles(commonLocalResources);
       finalReduceVertex.addDataSink("MROutput", MROutputLegacy.createConfigBuilder(finalReduceConf,
           NullOutputFormat.class).build());
       vertices.add(finalReduceVertex);

http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
index 3c4e66f..6157cbd 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
@@ -199,7 +199,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
     Vertex mapVertex = Vertex.create("initialmap", ProcessorDescriptor.create(
         MapProcessor.class.getName()).setUserPayload(
         TezUtils.createUserPayloadFromConf(mapStageConf))
-        .setHistoryText(mapStageHistoryText)).setTaskLocalFiles(commonLocalResources);
+        .setHistoryText(mapStageHistoryText)).addTaskLocalFiles(commonLocalResources);
     mapVertex.addDataSource("MRInput", dsd);
     vertices.add(mapVertex);
 
@@ -210,7 +210,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
         ReduceProcessor.class.getName())
         .setUserPayload(TezUtils.createUserPayloadFromConf(iReduceStageConf))
         .setHistoryText(iReduceStageHistoryText), 2);
-    ivertex.setTaskLocalFiles(commonLocalResources);
+    ivertex.addTaskLocalFiles(commonLocalResources);
     vertices.add(ivertex);
 
     ByteArrayOutputStream finalReduceOutputStream = new ByteArrayOutputStream(4096);
@@ -222,7 +222,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
             ReduceProcessor.class.getName())
             .setUserPayload(finalReducePayload)
             .setHistoryText(finalReduceStageHistoryText), 1);
-    finalReduceVertex.setTaskLocalFiles(commonLocalResources);
+    finalReduceVertex.addTaskLocalFiles(commonLocalResources);
     finalReduceVertex.addDataSink("MROutput",
         MROutputLegacy.createConfigBuilder(finalReduceConf, TextOutputFormat.class, outputPath)
             .build());
@@ -386,7 +386,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
           LOG.info("Pre-warming Session");
           PreWarmVertex preWarmVertex = PreWarmVertex.create("PreWarm", preWarmNumContainers, dag
               .getVertex("initialmap").getTaskResource());
-          preWarmVertex.setTaskLocalFiles(dag.getVertex("initialmap").getTaskLocalFiles());
+          preWarmVertex.addTaskLocalFiles(dag.getVertex("initialmap").getTaskLocalFiles());
           preWarmVertex.setTaskEnvironment(dag.getVertex("initialmap").getTaskEnvironment());
           preWarmVertex.setTaskLaunchCmdOpts(dag.getVertex("initialmap").getTaskLaunchCmdOpts());
           

http://git-wip-us.apache.org/repos/asf/tez/blob/db6d9b29/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index 49de6c1..48faefe 100644
--- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -785,7 +785,7 @@ public class TestMRRJobsDAGApi {
       LOG.info("Submitting dag to tez session with appId=" + tezSession.getAppMasterApplicationId()
           + " and Dag Name=" + dag.getName());
       if (additionalLocalResources != null) {
-        tezSession.addAppMasterLocalResources(additionalLocalResources);
+        tezSession.addAppMasterLocalFiles(additionalLocalResources);
       }
       dagClient = tezSession.submitDAG(dag);
       Assert.assertEquals(TezAppMasterStatus.RUNNING,


Mime
View raw message