tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-1130. Replace confusing names on Vertex API (bikas)
Date Fri, 11 Jul 2014 20:47:32 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master 2316c1dc9 -> cfe9a42a9


TEZ-1130. Replace confusing names on Vertex API (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/cfe9a42a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/cfe9a42a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/cfe9a42a

Branch: refs/heads/master
Commit: cfe9a42a9bd08b983d60d3823d9ed47c15ebdcf2
Parents: 2316c1d
Author: Bikas Saha <bikas@apache.org>
Authored: Fri Jul 11 13:47:20 2014 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Fri Jul 11 13:47:20 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/tez/client/TezClientUtils.java   |  10 +-
 .../main/java/org/apache/tez/dag/api/DAG.java   |  54 ++++-----
 .../java/org/apache/tez/dag/api/Vertex.java     | 114 +++++++++++++++++--
 .../org/apache/tez/dag/api/VertexGroup.java     |   2 +-
 .../dag/api/client/rpc/DAGClientRPCImpl.java    |   2 +-
 .../org/apache/tez/dag/api/TestDAGPlan.java     |  36 +++---
 .../org/apache/tez/dag/app/DAGAppMaster.java    |   4 +-
 .../mapreduce/examples/FilterLinesByWord.java   |   6 +-
 .../examples/FilterLinesByWordOneToOne.java     |   6 +-
 .../tez/mapreduce/examples/MRRSleepJob.java     |   8 +-
 .../mapreduce/examples/OrderedWordCount.java    |  12 +-
 .../apache/tez/mapreduce/client/YARNRunner.java |   8 +-
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java |   2 +-
 14 files changed, 178 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfe9a42a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a372031..ea0447f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -16,6 +16,7 @@ INCOMPATIBLE CHANGES
   TEZ-1127. Add TEZ_TASK_JAVA_OPTS and TEZ_ENV configs to specify values from
   config
   TEZ-692. Unify job submission in either TezClient or TezSession
+  TEZ-1130. Replace confusing names on Vertex API
 
 Release 0.4.0-incubating: 2014-04-05
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfe9a42a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 4711589..166f091 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -293,7 +293,7 @@ public class TezClientUtils {
     try {
       Set<Path> lrPaths = new HashSet<Path>();
       for (Vertex v: dag.getVertices()) {
-        for (LocalResource lr: v.getTaskLocalResources().values()) {
+        for (LocalResource lr: v.getTaskLocalFiles().values()) {
           lrPaths.add(ConverterUtils.getPathFromYarnURL(lr.getResource()));
         }
       }
@@ -509,9 +509,9 @@ public class TezClientUtils {
 
       for (Vertex v : dag.getVertices()) {
         if (tezJarResources != null) {
-          v.getTaskLocalResources().putAll(tezJarResources);
+          v.getTaskLocalFiles().putAll(tezJarResources);
         }
-        v.getTaskLocalResources().put(TezConfiguration.TEZ_PB_BINARY_CONF_NAME,
+        v.getTaskLocalFiles().put(TezConfiguration.TEZ_PB_BINARY_CONF_NAME,
             binaryConfLRsrc);
 
         Map<String, String> taskEnv = v.getTaskEnvironment();
@@ -618,7 +618,7 @@ public class TezClientUtils {
   }
   
   static void setDefaultLaunchCmdOpts(Vertex v, TezConfiguration conf) {
-    String vOpts = v.getJavaOpts();
+    String vOpts = v.getTaskLaunchCmdOpts();
     String vConfigOpts = conf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS,
         TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS_DEFAULT);
     if (vConfigOpts != null && vConfigOpts.length() > 0) {
@@ -628,7 +628,7 @@ public class TezClientUtils {
     vOpts = maybeAddDefaultLoggingJavaOpts(conf.get(
         TezConfiguration.TEZ_TASK_LOG_LEVEL,
         TezConfiguration.TEZ_TASK_LOG_LEVEL_DEFAULT), vOpts);
-    v.setJavaOpts(vOpts);
+    v.setTaskLaunchCmdOpts(vOpts);
   }
 
   @Private

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfe9a42a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 4772fae..30793c2 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -80,11 +80,11 @@ public class DAG {
   }
 
   public synchronized DAG addVertex(Vertex vertex) {
-    if (vertices.containsKey(vertex.getVertexName())) {
+    if (vertices.containsKey(vertex.getName())) {
       throw new IllegalStateException(
-        "Vertex " + vertex.getVertexName() + " already defined!");
+        "Vertex " + vertex.getName() + " already defined!");
     }
-    vertices.put(vertex.getVertexName(), vertex);
+    vertices.put(vertex.getName(), vertex);
     return this;
   }
 
@@ -260,8 +260,8 @@ public class DAG {
             Vertex outVertex = e.getOutputVertex();
             if (outVertex.getParallelism() == -1) {
               LOG.info("Inferring parallelism for vertex: "
-                  + outVertex.getVertexName() + " to be " + v.getParallelism()
-                  + " from 1-1 connection with vertex " + v.getVertexName());
+                  + outVertex.getName() + " to be " + v.getParallelism()
+                  + " from 1-1 connection with vertex " + v.getName());
               outVertex.setParallelism(v.getParallelism());
               newKnownTasksVertices.add(outVertex);
             }
@@ -281,8 +281,8 @@ public class DAG {
           if (outputVertex.getParallelism() != -1) {
             throw new TezUncheckedException(
                 "1-1 Edge. Destination vertex parallelism must match source vertex. "
-                + "Vertex: " + inputVertex.getVertexName() + " does not match vertex: " 
-                + outputVertex.getVertexName());
+                + "Vertex: " + inputVertex.getName() + " does not match vertex: " 
+                + outputVertex.getName());
           }
         }
       }
@@ -341,11 +341,11 @@ public class DAG {
     Map<Vertex, Set<String>> inboundVertexMap = new HashMap<Vertex, Set<String>>();
     Map<Vertex, Set<String>> outboundVertexMap = new HashMap<Vertex, Set<String>>();
     for (Vertex v : vertices.values()) {
-      if (vertexMap.containsKey(v.getVertexName())) {
+      if (vertexMap.containsKey(v.getName())) {
         throw new IllegalStateException("DAG contains multiple vertices"
-          + " with name: " + v.getVertexName());
+          + " with name: " + v.getName());
       }
-      vertexMap.put(v.getVertexName(), new AnnotatedVertex(v));
+      vertexMap.put(v.getName(), new AnnotatedVertex(v));
     }
 
     Map<Vertex, List<Edge>> edgeMap = new HashMap<Vertex, List<Edge>>();
@@ -366,7 +366,7 @@ public class DAG {
         inboundSet = new HashSet<String>();
         inboundVertexMap.put(outputVertex, inboundSet);
       }
-      inboundSet.add(inputVertex.getVertexName());
+      inboundSet.add(inputVertex.getName());
       
       // Construct map for Output name verification
       Set<String> outboundSet = outboundVertexMap.get(inputVertex);
@@ -374,7 +374,7 @@ public class DAG {
         outboundSet = new HashSet<String>();
         outboundVertexMap.put(inputVertex, outboundSet);
       }
-      outboundSet.add(outputVertex.getVertexName());
+      outboundSet.add(outputVertex.getName());
     }
 
     // check input and output names don't collide with vertex names
@@ -382,7 +382,7 @@ public class DAG {
       for (RootInputLeafOutput<InputDescriptor> input : vertex.getInputs()) {
         if (vertexMap.containsKey(input.getName())) {
           throw new IllegalStateException("Vertex: "
-              + vertex.getVertexName()
+              + vertex.getName()
               + " contains an Input with the same name as vertex: "
               + input.getName());
         }
@@ -390,7 +390,7 @@ public class DAG {
       for (RootInputLeafOutput<OutputDescriptor> output : vertex.getOutputs()) {
         if (vertexMap.containsKey(output.getName())) {
           throw new IllegalStateException("Vertex: "
-              + vertex.getVertexName()
+              + vertex.getName()
               + " contains an Output with the same name as vertex: "
               + output.getName());
         }
@@ -403,7 +403,7 @@ public class DAG {
       for (RootInputLeafOutput<InputDescriptor> input : vertex.getInputs()) {
         if (entry.getValue().contains(input.getName())) {
           throw new IllegalStateException("Vertex: "
-              + vertex.getVertexName()
+              + vertex.getName()
               + " contains an incoming vertex and Input with the same name: "
               + input.getName());
         }
@@ -416,7 +416,7 @@ public class DAG {
       for (RootInputLeafOutput<OutputDescriptor> output : vertex.getOutputs()) {
         if (entry.getValue().contains(output.getName())) {
           throw new IllegalStateException("Vertex: "
-              + vertex.getVertexName()
+              + vertex.getName()
               + " contains an outgoing vertex and Output with the same name: "
               + output.getName());
         }
@@ -479,7 +479,7 @@ public class DAG {
     List<Edge> edges = edgeMap.get(av.v);
     if (edges != null) {
       for (Edge e : edgeMap.get(av.v)) {
-        AnnotatedVertex outVertex = vertexMap.get(e.getOutputVertex().getVertexName());
+        AnnotatedVertex outVertex = vertexMap.get(e.getOutputVertex().getName());
         if (outVertex.index == -1) {
           strongConnect(outVertex, vertexMap, edgeMap, stack, nextIndex);
           av.lowlink = Math.min(av.lowlink, outVertex.lowlink);
@@ -498,12 +498,12 @@ public class DAG {
         // there was something on the stack other than this "av".
         // this indicates there is a scc/cycle. It comprises all nodes from top of stack
to "av"
         StringBuilder message = new StringBuilder();
-        message.append(av.v.getVertexName()).append(" <- ");
+        message.append(av.v.getName()).append(" <- ");
         for (; pop != av; pop = stack.pop()) {
-          message.append(pop.v.getVertexName()).append(" <- ");
+          message.append(pop.v.getName()).append(" <- ");
           pop.onstack = false;
         }
-        message.append(av.v.getVertexName());
+        message.append(av.v.getName());
         throw new IllegalStateException("DAG contains a cycle: " + message);
       }
     }
@@ -525,7 +525,7 @@ public class DAG {
         PlanVertexGroupInfo.Builder groupBuilder = PlanVertexGroupInfo.newBuilder();
         groupBuilder.setGroupName(groupInfo.getGroupName());
         for (Vertex v : groupInfo.getMembers()) {
-          groupBuilder.addGroupMembers(v.getVertexName());
+          groupBuilder.addGroupMembers(v.getName());
         }
         groupBuilder.addAllOutputs(groupInfo.outputs);
         for (Map.Entry<String, InputDescriptor> entry : 
@@ -540,7 +540,7 @@ public class DAG {
 
     for (Vertex vertex : vertices.values()) {
       VertexPlan.Builder vertexBuilder = VertexPlan.newBuilder();
-      vertexBuilder.setName(vertex.getVertexName());
+      vertexBuilder.setName(vertex.getName());
       vertexBuilder.setType(PlanVertexType.NORMAL); // vertex type is implicitly NORMAL until
 TEZ-46.
       vertexBuilder.setProcessorDescriptor(DagTypeConverters
         .convertToDAGPlan(vertex.getProcessorDescriptor()));
@@ -561,13 +561,13 @@ public class DAG {
       taskConfigBuilder.setNumTasks(vertex.getParallelism());
       taskConfigBuilder.setMemoryMb(resource.getMemory());
       taskConfigBuilder.setVirtualCores(resource.getVirtualCores());
-      taskConfigBuilder.setJavaOpts(vertex.getJavaOpts());
+      taskConfigBuilder.setJavaOpts(vertex.getTaskLaunchCmdOpts());
 
-      taskConfigBuilder.setTaskModule(vertex.getVertexName());
+      taskConfigBuilder.setTaskModule(vertex.getName());
       PlanLocalResource.Builder localResourcesBuilder = PlanLocalResource.newBuilder();
       localResourcesBuilder.clear();
       for (Entry<String, LocalResource> entry :
-             vertex.getTaskLocalResources().entrySet()) {
+             vertex.getTaskLocalFiles().entrySet()) {
         String key = entry.getKey();
         LocalResource lr = entry.getValue();
         localResourcesBuilder.setName(key);
@@ -637,8 +637,8 @@ public class DAG {
     for (Edge edge : edges) {
       EdgePlan.Builder edgeBuilder = EdgePlan.newBuilder();
       edgeBuilder.setId(edge.getId());
-      edgeBuilder.setInputVertexName(edge.getInputVertex().getVertexName());
-      edgeBuilder.setOutputVertexName(edge.getOutputVertex().getVertexName());
+      edgeBuilder.setInputVertexName(edge.getInputVertex().getName());
+      edgeBuilder.setOutputVertexName(edge.getOutputVertex().getName());
       edgeBuilder.setDataMovementType(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getDataMovementType()));
       edgeBuilder.setDataSourceType(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getDataSourceType()));
       edgeBuilder.setSchedulingType(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getSchedulingType()));

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfe9a42a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
index 007884a..00b10ef 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -58,8 +58,24 @@ public class Vertex {
   private final List<Edge> outputEdges = new ArrayList<Edge>();
   private final Map<String, GroupInfo> groupInputs = Maps.newHashMap();
   
-  private String javaOpts = "";
+  private String taskLaunchCmdOpts = "";
 
+  /**
+   * Create a new vertex with the given name.
+   * 
+   * @param vertexName
+   *          Name of the vertex
+   * @param processorDescriptor
+   *          Description of the processor that is executed in every task of
+   *          this vertex
+   * @param parallelism
+   *          Number of tasks in this vertex. Set to -1 if this is going to be
+   *          decided at runtime. Parallelism may change at runtime due to graph
+   *          reconfigurations.
+   * @param taskResource
+   *          Physical resources like memory/cpu thats used by each task of this
+   *          vertex
+   */
   public Vertex(String vertexName,
       ProcessorDescriptor processorDescriptor,
       int parallelism,
@@ -78,14 +94,28 @@ public class Vertex {
     }
   }
 
-  public String getVertexName() { // FIXME rename to getName()
+  /**
+   * Get the vertex name
+   * @return vertex name
+   */
+  public String getName() {
     return vertexName;
   }
 
+  /**
+   * Get the vertex task processor descriptor
+   * @return
+   */
   public ProcessorDescriptor getProcessorDescriptor() {
     return this.processorDescriptor;
   }
 
+  /**
+   * Get the specified number of tasks specified to run in this vertex. It may 
+   * be -1 if the parallelism is defined at runtime. Parallelism may change at 
+   * runtime
+   * @return vertex parallelism
+   */
   public int getParallelism() {
     return parallelism;
   }
@@ -94,10 +124,20 @@ public class Vertex {
     this.parallelism = parallelism;
   }
 
+  /**
+   * Get the resources for the vertex
+   * @return the physical resources like pcu/memory of each vertex task
+   */
   public Resource getTaskResource() {
     return taskResource;
   }
 
+  /**
+   * Specify location hints for the tasks of this vertex. Hints must be specified 
+   * for all tasks as defined by the parallelism
+   * @param locations list of locations for each task in the vertex
+   * @return this Vertex
+   */
   public Vertex setTaskLocationsHint(List<TaskLocationHint> locations) {
     if (locations == null) {
       return this;
@@ -113,31 +153,61 @@ public class Vertex {
     return taskLocationsHint;
   }
 
-  public Vertex setTaskLocalResources(Map<String, LocalResource> localResources) {
-    if (localResources == null) {
+  /**
+   * Set the files etc that must be provided to the tasks of this vertex
+   * @param localFiles
+   *          files that must be available locally for each task. These files
+   *          may be regular files, archives etc. as specified by the value
+   *          elements of the map.
+   * @return this Vertex
+   */
+  public Vertex setTaskLocalFiles(Map<String, LocalResource> localFiles) {
+    if (localFiles == null) {
       this.taskLocalResources = new HashMap<String, LocalResource>();
     } else {
-      this.taskLocalResources = localResources;
+      this.taskLocalResources = localFiles;
     }
     return this;
   }
 
-  public Map<String, LocalResource> getTaskLocalResources() {
+  /**
+   * Get the files etc that must be provided by the tasks of this vertex
+   * @return local files of the vertex. Key is the file name.
+   */
+  public Map<String, LocalResource> getTaskLocalFiles() {
     return taskLocalResources;
   }
 
+  /**
+   * Set the Key-Value pairs of environment variables for tasks of this vertex.
+   * This method should be used if different vertices need different env. Else,
+   * set environment for all vertices via Tezconfiguration#TEZ_TASK_LAUNCH_ENV
+   * @param environment
+   * @return this Vertex
+   */
   public Vertex setTaskEnvironment(Map<String, String> environment) {
     Preconditions.checkArgument(environment != null);
     this.taskEnvironment.putAll(environment);
     return this;
   }
 
+  /**
+   * Get the environment variables of the tasks
+   * @return environment variable map
+   */
   public Map<String, String> getTaskEnvironment() {
     return taskEnvironment;
   }
 
-  public Vertex setJavaOpts(String javaOpts){
-     this. javaOpts = javaOpts;
+  /**
+   * Set the command opts for tasks of this vertex. This method should be used 
+   * when different vertices have different opts. Else, set the launch opts for '
+   * all vertices via Tezconfiguration#TEZ_TASK_LAUNCH_CMD_OPTS
+   * @param cmdOpts
+   * @return this Vertex
+   */
+  public Vertex setTaskLaunchCmdOpts(String cmdOpts){
+     this.taskLaunchCmdOpts = cmdOpts;
      return this;
   }
   
@@ -211,6 +281,23 @@ public class Vertex {
     return this;
   }
 
+  /**
+   * Specifies an Output for a Vertex. This is meant to be used when a Vertex
+   * writes Output directly to an external destination. </p>
+   * 
+   * If an output of the vertex is meant to be consumed by another Vertex in the
+   * DAG - use the {@link DAG addEdge} method.
+   * 
+   * If a vertex needs generate data to an external source as well as for
+   * another Vertex in the DAG, a combination of this API and the DAG.addEdge
+   * API can be used.
+   * 
+   * @param outputName
+   *          the name of the output. This will be used when accessing the
+   *          output in the {@link LogicalIOProcessor}
+   * @param outputDescriptor
+   * @return this Vertex
+   */
   public Vertex addOutput(String outputName, OutputDescriptor outputDescriptor) {
     return addOutput(outputName, outputDescriptor, null);
   }
@@ -229,8 +316,12 @@ public class Vertex {
     return this;
   }
 
-  public String getJavaOpts(){
-	  return javaOpts;
+  /**
+   * Get the launch command opts for tasks in this vertex
+   * @return launch command opts
+   */
+  public String getTaskLaunchCmdOpts(){
+	  return taskLaunchCmdOpts;
   }
 
   @Override
@@ -249,7 +340,7 @@ public class Vertex {
   void addGroupInput(String groupName, GroupInfo groupInputInfo) {
     if (groupInputs.put(groupName, groupInputInfo) != null) {
       throw new IllegalStateException(
-          "Vertex: " + getVertexName() + 
+          "Vertex: " + getName() + 
           " already has group input with name:" + groupName);
     }
   }
@@ -287,5 +378,4 @@ public class Vertex {
   List<RootInputLeafOutput<OutputDescriptor>> getOutputs() {
     return additionalOutputs;
   }
-  // FIXME how do we support profiling? Can't profile all tasks.
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfe9a42a/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
index db54a39..48b0873 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
@@ -120,7 +120,7 @@ public class VertexGroup {
   }
   
   void addOutputVertex(Vertex outputVertex, GroupInputEdge edge) {
-    this.groupInfo.edgeMergedInputs.put(outputVertex.getVertexName(), edge.getMergedInput());
+    this.groupInfo.edgeMergedInputs.put(outputVertex.getName(), edge.getMergedInput());
   }
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfe9a42a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
index 126f26c..812dac9 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -336,7 +336,7 @@ public class DAGClientRPCImpl implements DAGClient {
     Set<String> vertexNames = new HashSet<String>();
     if (vertices != null) {
       for (Vertex vertex : vertices) {
-        vertexNames.add(vertex.getVertexName());
+        vertexNames.add(vertex.getName());
       }
     }
     return _waitForCompletionWithStatusUpdates(vertexNames, statusGetOpts);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfe9a42a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
index 1294d9b..8f6b516 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
@@ -106,10 +106,10 @@ public class TestDAGPlan {
         .setUserPayload("processor2Bytes".getBytes());
     Vertex v1 = new Vertex("v1", pd1, 10, Resource.newInstance(1024, 1));
     Vertex v2 = new Vertex("v2", pd2, 1, Resource.newInstance(1024, 1));
-    v1.setJavaOpts("").setTaskEnvironment(new HashMap<String, String>())
-        .setTaskLocalResources(new HashMap<String, LocalResource>());
-    v2.setJavaOpts("").setTaskEnvironment(new HashMap<String, String>())
-        .setTaskLocalResources(new HashMap<String, LocalResource>());
+    v1.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>())
+        .setTaskLocalFiles(new HashMap<String, LocalResource>());
+    v2.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>())
+        .setTaskLocalFiles(new HashMap<String, LocalResource>());
 
     InputDescriptor inputDescriptor = new InputDescriptor("input").setUserPayload("inputBytes"
         .getBytes());
@@ -141,10 +141,10 @@ public class TestDAGPlan {
         setUserPayload("processor2Bytes".getBytes());
     Vertex v1 = new Vertex("v1", pd1, 10, Resource.newInstance(1024, 1));
     Vertex v2 = new Vertex("v2", pd2, 1, Resource.newInstance(1024, 1));
-    v1.setJavaOpts("").setTaskEnvironment(new HashMap<String, String>())
-        .setTaskLocalResources(new HashMap<String, LocalResource>());
-    v2.setJavaOpts("").setTaskEnvironment(new HashMap<String, String>())
-        .setTaskLocalResources(new HashMap<String, LocalResource>());
+    v1.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>())
+        .setTaskLocalFiles(new HashMap<String, LocalResource>());
+    v2.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>())
+        .setTaskLocalFiles(new HashMap<String, LocalResource>());
 
     InputDescriptor inputDescriptor = new InputDescriptor("input").
         setUserPayload("inputBytes".getBytes());
@@ -205,12 +205,12 @@ public class TestDAGPlan {
     Vertex v1 = new Vertex("v1", pd1, 10, Resource.newInstance(1024, 1));
     Vertex v2 = new Vertex("v2", pd2, 1, Resource.newInstance(1024, 1));
     Vertex v3 = new Vertex("v3", pd3, 1, Resource.newInstance(1024, 1));
-    v1.setJavaOpts("").setTaskEnvironment(new HashMap<String, String>())
-        .setTaskLocalResources(new HashMap<String, LocalResource>());
-    v2.setJavaOpts("").setTaskEnvironment(new HashMap<String, String>())
-        .setTaskLocalResources(new HashMap<String, LocalResource>());
-    v3.setJavaOpts("").setTaskEnvironment(new HashMap<String, String>())
-        .setTaskLocalResources(new HashMap<String, LocalResource>());
+    v1.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>())
+        .setTaskLocalFiles(new HashMap<String, LocalResource>());
+    v2.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>())
+        .setTaskLocalFiles(new HashMap<String, LocalResource>());
+    v3.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>())
+        .setTaskLocalFiles(new HashMap<String, LocalResource>());
 
     InputDescriptor inputDescriptor = new InputDescriptor("input").
         setUserPayload("inputBytes".getBytes());
@@ -273,10 +273,10 @@ public class TestDAGPlan {
         setUserPayload("processor2Bytes".getBytes());
     Vertex v1 = new Vertex("v1", pd1, 10, Resource.newInstance(1024, 1));
     Vertex v2 = new Vertex("v2", pd2, 1, Resource.newInstance(1024, 1));
-    v1.setJavaOpts("").setTaskEnvironment(new HashMap<String, String>())
-        .setTaskLocalResources(new HashMap<String, LocalResource>());
-    v2.setJavaOpts("").setTaskEnvironment(new HashMap<String, String>())
-        .setTaskLocalResources(new HashMap<String, LocalResource>());
+    v1.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>())
+        .setTaskLocalFiles(new HashMap<String, LocalResource>());
+    v2.setTaskLaunchCmdOpts("").setTaskEnvironment(new HashMap<String, String>())
+        .setTaskLocalFiles(new HashMap<String, LocalResource>());
 
     InputDescriptor inputDescriptor = new InputDescriptor("input").
         setUserPayload("inputBytes".getBytes());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfe9a42a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index c3661f9..41b808b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -997,14 +997,14 @@ public class DAGAppMaster extends AbstractService {
       preWarmVertex.setTaskEnvironment(preWarmContext.getEnvironment());
     }
     if (preWarmContext.getLocalResources() != null) {
-      preWarmVertex.setTaskLocalResources(preWarmContext.getLocalResources());
+      preWarmVertex.setTaskLocalFiles(preWarmContext.getLocalResources());
     }
     if (preWarmContext.getLocationHints() != null) {
       preWarmVertex.setTaskLocationsHint(
         preWarmContext.getLocationHints().getTaskLocationHints());
     }
     if (preWarmContext.getJavaOpts() != null) {
-      preWarmVertex.setJavaOpts(preWarmContext.getJavaOpts());
+      preWarmVertex.setTaskLaunchCmdOpts(preWarmContext.getJavaOpts());
     }
     dag.addVertex(preWarmVertex);
     LOG.info("Pre-warming containers"

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfe9a42a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
index 34d1e11..e820bda 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
@@ -189,9 +189,9 @@ public class FilterLinesByWord extends Configured implements Tool {
       Map<String, LocalResource> stage1LocalResources = new HashMap<String, LocalResource>();
       stage1LocalResources.putAll(commonLocalResources);
       MRHelpers.updateLocalResourcesForInputSplits(fs, inputSplitInfo, stage1LocalResources);
-      stage1Vertex.setTaskLocalResources(stage1LocalResources);
+      stage1Vertex.setTaskLocalFiles(stage1LocalResources);
     } else {
-      stage1Vertex.setTaskLocalResources(commonLocalResources);
+      stage1Vertex.setTaskLocalFiles(commonLocalResources);
     }
 
     // Configure the Input for stage1
@@ -207,7 +207,7 @@ public class FilterLinesByWord extends Configured implements Tool {
         FilterByWordOutputProcessor.class.getName()).setUserPayload(MRHelpers
         .createUserPayloadFromConf(stage2Conf)), 1,
         MRHelpers.getReduceResource(stage2Conf));
-    stage2Vertex.setTaskLocalResources(commonLocalResources);
+    stage2Vertex.setTaskLocalFiles(commonLocalResources);
 
     // Configure the Output for stage2
     OutputDescriptor od = new OutputDescriptor(MROutput.class.getName())

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfe9a42a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
index 23fa0e0..30fb60f 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
@@ -175,9 +175,9 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool
{
       Map<String, LocalResource> stage1LocalResources = new HashMap<String, LocalResource>();
       stage1LocalResources.putAll(commonLocalResources);
       MRHelpers.updateLocalResourcesForInputSplits(fs, inputSplitInfo, stage1LocalResources);
-      stage1Vertex.setTaskLocalResources(stage1LocalResources);
+      stage1Vertex.setTaskLocalFiles(stage1LocalResources);
     } else {
-      stage1Vertex.setTaskLocalResources(commonLocalResources);
+      stage1Vertex.setTaskLocalFiles(commonLocalResources);
     }
 
     // Configure the Input for stage1
@@ -193,7 +193,7 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool
{
         FilterByWordOutputProcessor.class.getName()).setUserPayload(MRHelpers
         .createUserPayloadFromConf(stage2Conf)), stage1NumTasks,
         MRHelpers.getMapResource(stage2Conf));
-    stage2Vertex.setTaskLocalResources(commonLocalResources);
+    stage2Vertex.setTaskLocalFiles(commonLocalResources);
 
     // Configure the Output for stage2
     stage2Vertex.addOutput("MROutput",

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfe9a42a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index e0bcca2..fa76bca 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -584,9 +584,9 @@ public class MRRSleepJob extends Configured implements Tool {
       mapLocalResources.putAll(commonLocalResources);
       MRHelpers.updateLocalResourcesForInputSplits(remoteFs, inputSplitInfo,
           mapLocalResources);
-      mapVertex.setTaskLocalResources(mapLocalResources);
+      mapVertex.setTaskLocalFiles(mapLocalResources);
     } else {
-      mapVertex.setTaskLocalResources(commonLocalResources);
+      mapVertex.setTaskLocalFiles(commonLocalResources);
     }
 
     if (generateSplitsInAM) {
@@ -610,7 +610,7 @@ public class MRRSleepJob extends Configured implements Tool {
                 new ProcessorDescriptor(ReduceProcessor.class.getName()).
                 setUserPayload(iReduceUserPayload), numIReducer,
                 MRHelpers.getReduceResource(iconf));
-        ivertex.setTaskLocalResources(commonLocalResources);
+        ivertex.setTaskLocalFiles(commonLocalResources);
         vertices.add(ivertex);
       }
     }
@@ -621,7 +621,7 @@ public class MRRSleepJob extends Configured implements Tool {
       finalReduceVertex = new Vertex("reduce", new ProcessorDescriptor(
           ReduceProcessor.class.getName()).setUserPayload(reducePayload),
           numReducer, MRHelpers.getReduceResource(finalReduceConf));
-      finalReduceVertex.setTaskLocalResources(commonLocalResources);
+      finalReduceVertex.setTaskLocalFiles(commonLocalResources);
       MRHelpers.addMROutputLegacy(finalReduceVertex, reducePayload);
       vertices.add(finalReduceVertex);
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfe9a42a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index 995a1ba..6e4f22c 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -216,9 +216,9 @@ public class OrderedWordCount extends Configured implements Tool {
       mapLocalResources.putAll(commonLocalResources);
       MRHelpers.updateLocalResourcesForInputSplits(fs, inputSplitInfo,
           mapLocalResources);
-      mapVertex.setTaskLocalResources(mapLocalResources);
+      mapVertex.setTaskLocalFiles(mapLocalResources);
     } else {
-      mapVertex.setTaskLocalResources(commonLocalResources);
+      mapVertex.setTaskLocalFiles(commonLocalResources);
     }
 
     Class<? extends TezRootInputInitializer> initializerClazz = generateSplitsInClient
? null
@@ -231,7 +231,7 @@ public class OrderedWordCount extends Configured implements Tool {
         setUserPayload(MRHelpers.createUserPayloadFromConf(iReduceStageConf)),
         2,
         MRHelpers.getReduceResource(iReduceStageConf));
-    ivertex.setTaskLocalResources(commonLocalResources);
+    ivertex.setTaskLocalFiles(commonLocalResources);
     vertices.add(ivertex);
 
     byte[] finalReducePayload = MRHelpers.createUserPayloadFromConf(finalReduceConf);
@@ -239,7 +239,7 @@ public class OrderedWordCount extends Configured implements Tool {
         new ProcessorDescriptor(
             ReduceProcessor.class.getName()).setUserPayload(finalReducePayload),
                 1, MRHelpers.getReduceResource(finalReduceConf));
-    finalReduceVertex.setTaskLocalResources(commonLocalResources);
+    finalReduceVertex.setTaskLocalFiles(commonLocalResources);
     MRHelpers.addMROutputLegacy(finalReduceVertex, finalReducePayload);
     vertices.add(finalReduceVertex);
 
@@ -405,11 +405,11 @@ public class OrderedWordCount extends Configured implements Tool {
           Map<String, LocalResource> contextLocalRsrcs =
             new TreeMap<String, LocalResource>();
           contextLocalRsrcs.putAll(
-            dag.getVertex("initialmap").getTaskLocalResources());
+            dag.getVertex("initialmap").getTaskLocalFiles());
           Map<String, String> contextEnv = new TreeMap<String, String>();
           contextEnv.putAll(dag.getVertex("initialmap").getTaskEnvironment());
           String contextJavaOpts =
-            dag.getVertex("initialmap").getJavaOpts();
+            dag.getVertex("initialmap").getTaskLaunchCmdOpts();
           context
             .setLocalResources(contextLocalRsrcs)
             .setJavaOpts(contextJavaOpts)

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfe9a42a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
index faf2192..90f3055 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
@@ -428,9 +428,9 @@ public class YARNRunner implements ClientProtocol {
         : MRHelpers.getReduceJavaOpts(stageConf);
 
     vertex.setTaskEnvironment(taskEnv)
-        .setTaskLocalResources(taskLocalResources)
+        .setTaskLocalFiles(taskLocalResources)
         .setTaskLocationsHint(locations)
-        .setJavaOpts(taskJavaOpts);
+        .setTaskLaunchCmdOpts(taskJavaOpts);
     
     if (!isMap) {
       vertex.setVertexManagerPlugin(new VertexManagerPluginDescriptor(
@@ -439,9 +439,9 @@ public class YARNRunner implements ClientProtocol {
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Adding vertex to DAG" + ", vertexName="
-          + vertex.getVertexName() + ", processor="
+          + vertex.getName() + ", processor="
           + vertex.getProcessorDescriptor().getClassName() + ", parallelism="
-          + vertex.getParallelism() + ", javaOpts=" + vertex.getJavaOpts()
+          + vertex.getParallelism() + ", javaOpts=" + vertex.getTaskLaunchCmdOpts()
           + ", resources=" + vertex.getTaskResource()
       // TODO Add localResources and Environment
       );

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cfe9a42a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index dae054e..f15ecfa 100644
--- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -502,7 +502,7 @@ public class TestMRRJobsDAGApi {
           createLocalResource(remoteFs, inputSplitInfo.getSplitsMetaInfoFile(),
               LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
 
-      stage1Vertex.setTaskLocalResources(stage1LocalResources);
+      stage1Vertex.setTaskLocalFiles(stage1LocalResources);
       stage1Vertex.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
     }
     


Mime
View raw message