tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject git commit: TEZ-250. Make sure Vertex task resource is always defined. (hitesh)
Date Fri, 21 Jun 2013 22:05:41 GMT
Updated Branches:
  refs/heads/master efacbb5d2 -> 901ecbe8d


TEZ-250. Make sure Vertex task resource is always defined. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/901ecbe8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/901ecbe8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/901ecbe8

Branch: refs/heads/master
Commit: 901ecbe8d5285f15ef2b7832f22cb7a4cb674904
Parents: efacbb5
Author: Hitesh Shah <hitesh@apache.org>
Authored: Fri Jun 21 15:05:14 2013 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Fri Jun 21 15:05:14 2013 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/tez/dag/api/DAG.java   |  85 ++++----
 .../java/org/apache/tez/dag/api/Vertex.java     |  19 +-
 .../org/apache/tez/dag/api/TestDAGPlan.java     |  18 +-
 .../org/apache/tez/dag/api/TestDAGVerify.java   | 210 +++++++++++++------
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java |  15 +-
 .../org/apache/tez/mapreduce/YARNRunner.java    |  34 +--
 6 files changed, 228 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/901ecbe8/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 58ac804..1b8bbcb 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -67,7 +67,7 @@ public class DAG { // FIXME rename to Topology
   public synchronized List<Vertex> getVertices() {
     return Collections.unmodifiableList(this.vertices);
   }
-  
+
   public synchronized DAG addEdge(Edge edge) {
     // Sanity checks
     if (!vertices.contains(edge.getInputVertex())) {
@@ -76,17 +76,17 @@ public class DAG { // FIXME rename to Topology
     }
     if (!vertices.contains(edge.getOutputVertex())) {
       throw new IllegalArgumentException(
-          "Output vertex " + edge.getOutputVertex() + " doesn't exist!");    
+          "Output vertex " + edge.getOutputVertex() + " doesn't exist!");
     }
     if (edges.contains(edge)) {
       throw new IllegalArgumentException(
           "Edge " + edge + " already defined!");
     }
-    
+
     // Inform the vertices
     edge.getInputVertex().addOutputVertex(edge.getOutputVertex(), edge.getId());
     edge.getOutputVertex().addInputVertex(edge.getInputVertex(), edge.getId());
-    
+
     edges.add(edge);
     return this;
   }
@@ -94,18 +94,18 @@ public class DAG { // FIXME rename to Topology
   public String getName() {
     return this.name;
   }
-  
-  // AnnotatedVertex is used by verify() 
+
+  // AnnotatedVertex is used by verify()
   private class AnnotatedVertex {
     Vertex v;
-  
-    int index; //for Tarjan's algorithm    
+
+    int index; //for Tarjan's algorithm
     int lowlink; //for Tarjan's algorithm
-    boolean onstack; //for Tarjan's algorithm 
+    boolean onstack; //for Tarjan's algorithm
 
     int inDegree;
     int outDegree;
-    
+
     private AnnotatedVertex(Vertex v){
        this.v = v;
        index = -1;
@@ -114,9 +114,9 @@ public class DAG { // FIXME rename to Topology
        outDegree = 0;
     }
   }
-  
+
   // verify()
-  // 
+  //
   // Default rules
   //   Illegal:
   //     - duplicate vertex id
@@ -130,14 +130,14 @@ public class DAG { // FIXME rename to Topology
   //     - orphaned vertex in DAG of >1 vertex.  Could be unrelated map-only job.
   //     - v1->v2 via two edges.  perhaps some self-join job would use this?
   //
-  // "restricted" mode: 
-  //   In short term, the supported DAGs are limited. Call with restricted=true for these
verifications.  
-  //   Illegal: 
-  //     - any vertex with more than one input or output edge. (n-ary input, n-ary merge)

+  // "restricted" mode:
+  //   In short term, the supported DAGs are limited. Call with restricted=true for these
verifications.
+  //   Illegal:
+  //     - any vertex with more than one input or output edge. (n-ary input, n-ary merge)
   public void verify() throws IllegalStateException {
     verify(true);
   }
-  
+
   public void verify(boolean restricted) throws IllegalStateException  {
     if (vertices.isEmpty()) {
       throw new IllegalStateException("Invalid dag containing 0 vertices");
@@ -153,23 +153,20 @@ public class DAG { // FIXME rename to Topology
       }
       edgeList.add(e);
     }
-    
-    // check for duplicate vertex names, and prepare for cycle detection
+
+    // check for valid vertices, duplicate vertex names,
+    // and prepare for cycle detection
     Map<String, AnnotatedVertex> vertexMap = new HashMap<String, AnnotatedVertex>();
     for(Vertex v : vertices){
       if(vertexMap.containsKey(v.getVertexName())){
          throw new IllegalStateException("DAG contains multiple vertices"
              + " with name: " + v.getVertexName());
       }
-      if (v.getParallelism() == 0) {
-        throw new IllegalStateException("Vertex configured with 0 tasks"
-            + ", vertexName=" + v.getVertexName());
-      }
       vertexMap.put(v.getVertexName(), new AnnotatedVertex(v));
     }
-    
+
     detectCycles(edgeMap, vertexMap);
-    
+
     if(restricted){
       for(Edge e : edges){
         vertexMap.get(e.getInputVertex().getVertexName()).outDegree++;
@@ -177,18 +174,20 @@ public class DAG { // FIXME rename to Topology
       }
       for(AnnotatedVertex av: vertexMap.values()){
         if(av.inDegree > 1){
-          throw new IllegalStateException("Vertex has inDegree>1: " + av.v.getVertexName());
+          throw new IllegalStateException("Vertex has inDegree>1: "
+              + av.v.getVertexName());
         }
         if(av.outDegree > 1){
-          throw new IllegalStateException("Vertex has outDegree>1: " + av.v.getVertexName());
+          throw new IllegalStateException("Vertex has outDegree>1: "
+              + av.v.getVertexName());
         }
       }
     }
   }
-  
+
   // Adaptation of Tarjan's algorithm for connected components.
   // http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm
-  private void detectCycles(Map<Vertex, List<Edge>> edgeMap, Map<String, AnnotatedVertex>
vertexMap) 
+  private void detectCycles(Map<Vertex, List<Edge>> edgeMap, Map<String, AnnotatedVertex>
vertexMap)
       throws IllegalStateException{
     Integer nextIndex = 0; // boxed integer so it is passed by reference.
     Stack<AnnotatedVertex> stack = new Stack<DAG.AnnotatedVertex>();
@@ -203,16 +202,16 @@ public class DAG { // FIXME rename to Topology
   // part of Tarjan's algorithm for connected components.
   // http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm
   private void strongConnect(
-          AnnotatedVertex av, 
-          Map<String, AnnotatedVertex> vertexMap, 
-          Map<Vertex, List<Edge>> edgeMap, 
+          AnnotatedVertex av,
+          Map<String, AnnotatedVertex> vertexMap,
+          Map<Vertex, List<Edge>> edgeMap,
           Stack<AnnotatedVertex> stack, Integer nextIndex) throws IllegalStateException{
     av.index = nextIndex;
     av.lowlink = nextIndex;
     nextIndex++;
     stack.push(av);
     av.onstack = true;
-    
+
     List<Edge> edges = edgeMap.get(av.v);
     if(edges != null){
       for(Edge e : edgeMap.get(av.v)){
@@ -237,7 +236,7 @@ public class DAG { // FIXME rename to Topology
          // this indicates there is a scc/cycle. It comprises all nodes from top of stack
to "av"
          StringBuilder message = new StringBuilder();
          message.append(av.v.getVertexName() + " <- ");
-         for( ; pop != av; pop = stack.pop()){ 
+         for( ; pop != av; pop = stack.pop()){
            message.append(pop.v.getVertexName() + " <- ");
            pop.onstack = false;
          }
@@ -246,15 +245,15 @@ public class DAG { // FIXME rename to Topology
        }
     }
   }
- 
-  
+
+
   // create protobuf message describing DAG
   @Private
   public DAGPlan createDag(Configuration amConf) {
-    
+
     verify(true);
-    
-    DAGPlan.Builder jobBuilder = DAGPlan.newBuilder();  
+
+    DAGPlan.Builder jobBuilder = DAGPlan.newBuilder();
 
     jobBuilder.setName(this.name);
 
@@ -263,8 +262,8 @@ public class DAG { // FIXME rename to Topology
       vertexBuilder.setName(vertex.getVertexName());
       vertexBuilder.setType(PlanVertexType.NORMAL); // vertex type is implicitly NORMAL until
 TEZ-46.
       vertexBuilder.setProcessorDescriptor(DagTypeConverters
-          .convertToDAGPlan(vertex.getProcessorDescriptor()));      
-      
+          .convertToDAGPlan(vertex.getProcessorDescriptor()));
+
       //task config
       PlanTaskConfiguration.Builder taskConfigBuilder = PlanTaskConfiguration.newBuilder();
       Resource resource = vertex.getTaskResource();
@@ -320,11 +319,11 @@ public class DAG { // FIXME rename to Topology
       for(String inEdgeId : vertex.getInputEdgeIds()){
         vertexBuilder.addInEdgeId(inEdgeId);
       }
-      
+
       for(String outEdgeId : vertex.getOutputEdgeIds()){
         vertexBuilder.addOutEdgeId(outEdgeId);
       }
-      
+
       vertexBuilder.setTaskConfig(taskConfigBuilder);
       jobBuilder.addVertex(vertexBuilder);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/901ecbe8/tez-dag-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/Vertex.java
index 4bd3217..8803db9 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/Vertex.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -33,7 +33,7 @@ public class Vertex { // FIXME rename to Task
 
   private final int parallelism;
   private VertexLocationHint taskLocationsHint;
-  private Resource taskResource;
+  private final Resource taskResource;
   private Map<String, LocalResource> taskLocalResources;
   private Map<String, String> taskEnvironment;
 
@@ -44,10 +44,20 @@ public class Vertex { // FIXME rename to Task
   private String javaOpts = "";
 
 
-  public Vertex(String vertexName, ProcessorDescriptor processorDescriptor, int parallelism)
{
+  public Vertex(String vertexName,
+      ProcessorDescriptor processorDescriptor,
+      int parallelism,
+      Resource taskResource) {
     this.vertexName = vertexName;
     this.processorDescriptor = processorDescriptor;
     this.parallelism = parallelism;
+    this.taskResource = taskResource;
+    if (parallelism == 0) {
+      throw new IllegalArgumentException("Parallelism cannot be 0");
+    }
+    if (taskResource == null) {
+      throw new IllegalArgumentException("Resource cannot be null");
+    }
   }
 
   public String getVertexName() { // FIXME rename to getName()
@@ -62,11 +72,6 @@ public class Vertex { // FIXME rename to Task
     return parallelism;
   }
 
-  public Vertex setTaskResource(Resource resource) {
-    this.taskResource = resource;
-    return this;
-  }
-
   public Resource getTaskResource() {
     return taskResource;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/901ecbe8/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
index b735a8b..fdab79d 100644
--- a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
+++ b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
@@ -72,11 +72,11 @@ public class TestDAGPlan {
    FileOutputStream outStream = null;
    try {
      outStream = new FileOutputStream(file);
-     job.writeTo(outStream); 
+     job.writeTo(outStream);
    }
    finally {
      if(outStream != null){
-       outStream.close();  
+       outStream.close();
      }
    }
 
@@ -87,12 +87,12 @@ public class TestDAGPlan {
      inJob = DAGPlan.newBuilder().mergeFrom(inputStream).build();
    }
    finally {
-     outStream.close();  
+     outStream.close();
    }
 
    Assert.assertEquals(job, inJob);
   }
-  
+
   @Test
   public void testUserPayloadSerde() {
     DAG dag = new DAG("testDag");
@@ -100,14 +100,12 @@ public class TestDAGPlan {
         ByteBuffer.wrap("processor1Bytes".getBytes()));
     ProcessorDescriptor pd2 = new ProcessorDescriptor("processor2",
         ByteBuffer.wrap("processor2Bytes".getBytes()));
-    Vertex v1 = new Vertex("v1", pd1, 10);
-    Vertex v2 = new Vertex("v2", pd2, 1);
+    Vertex v1 = new Vertex("v1", pd1, 10, Resource.newInstance(1024, 1));
+    Vertex v2 = new Vertex("v2", pd2, 1, Resource.newInstance(1024, 1));
     v1.setJavaOpts("").setTaskEnvironment(new HashMap<String, String>())
-        .setTaskLocalResources(new HashMap<String, LocalResource>())
-        .setTaskResource(Resource.newInstance(1024, 1));
+        .setTaskLocalResources(new HashMap<String, LocalResource>());
     v2.setJavaOpts("").setTaskEnvironment(new HashMap<String, String>())
-        .setTaskLocalResources(new HashMap<String, LocalResource>())
-        .setTaskResource(Resource.newInstance(1024, 1));
+        .setTaskLocalResources(new HashMap<String, LocalResource>());
 
     InputDescriptor inputDescriptor = new InputDescriptor("input",
         ByteBuffer.wrap("inputBytes".getBytes()));

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/901ecbe8/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index eaf8331..7d8061d 100644
--- a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -18,33 +18,42 @@
 
 package org.apache.tez.dag.api;
 
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.dag.api.EdgeProperty.ConnectionPattern;
 import org.apache.tez.dag.api.EdgeProperty.SourceType;
 import org.junit.Assert;
 import org.junit.Test;
 
 public class TestDAGVerify {
-  
+
   private final String dummyProcessorClassName = TestDAGVerify.class.getName();
   private final String dummyInputClassName = TestDAGVerify.class.getName();
   private final String dummyOutputClassName = TestDAGVerify.class.getName();
   private final int dummyTaskCount = 2;
-  
+  private final Resource dummyTaskResource = Resource.newInstance(1, 1);
+
   //    v1
-  //    |  
+  //    |
   //    v2
   @Test
   public void testVerify1() {
-    Vertex v1 = new Vertex("v1", new ProcessorDescriptor(dummyProcessorClassName, null),
dummyTaskCount);
-    Vertex v2 = new Vertex("v2", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor",
null), dummyTaskCount);
-    Edge e1 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
new OutputDescriptor(dummyOutputClassName, null), new InputDescriptor(dummyInputClassName,
null)));
+    Vertex v1 = new Vertex("v1",
+        new ProcessorDescriptor(dummyProcessorClassName, null),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v2 = new Vertex("v2",
+        new ProcessorDescriptor("MapProcessor", null),
+        dummyTaskCount, dummyTaskResource);
+    Edge e1 = new Edge(v1, v2,
+        new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
+            new OutputDescriptor(dummyOutputClassName, null),
+            new InputDescriptor(dummyInputClassName, null)));
     DAG dag = new DAG("testDag");
     dag.addVertex(v1);
     dag.addVertex(v2);
     dag.addEdge(e1);
     dag.verify();
   }
-  
+
   //    v1 <----
   //      |     ^
   //       v2   ^
@@ -53,14 +62,34 @@ public class TestDAGVerify {
   @Test
   public void testCycle1() {
     IllegalStateException ex=null;
-    Vertex v1 = new Vertex("v1", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor",
null), dummyTaskCount);
-    Vertex v2 = new Vertex("v2", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor",
null), dummyTaskCount);
-    Vertex v3 = new Vertex("v3", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor",
null), dummyTaskCount);
-    Vertex v4 = new Vertex("v4", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor",
null), dummyTaskCount);
-    Edge e1 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
new OutputDescriptor("dummy output class", null), new InputDescriptor("dummy input class",
null)));
-    Edge e2 = new Edge(v2, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
new OutputDescriptor("dummy output class", null), new InputDescriptor("dummy input class",
null)));
-    Edge e3 = new Edge(v2, v4, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
new OutputDescriptor("dummy output class", null), new InputDescriptor("dummy input class",
null)));
-    Edge e4 = new Edge(v4, v1, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
new OutputDescriptor("dummy output class", null), new InputDescriptor("dummy input class",
null)));
+    Vertex v1 = new Vertex("v1",
+        new ProcessorDescriptor("MapProcessor", null),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v2 = new Vertex("v2",
+        new ProcessorDescriptor("MapProcessor", null),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v3 = new Vertex("v3",
+        new ProcessorDescriptor("MapProcessor", null),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v4 = new Vertex("v4",
+        new ProcessorDescriptor("MapProcessor", null),
+        dummyTaskCount, dummyTaskResource);
+    Edge e1 = new Edge(v1, v2,
+        new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
+            new OutputDescriptor("dummy output class", null),
+            new InputDescriptor("dummy input class", null)));
+    Edge e2 = new Edge(v2, v3,
+        new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
+            new OutputDescriptor("dummy output class", null),
+            new InputDescriptor("dummy input class", null)));
+    Edge e3 = new Edge(v2, v4,
+        new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
+            new OutputDescriptor("dummy output class", null),
+            new InputDescriptor("dummy input class", null)));
+    Edge e4 = new Edge(v4, v1,
+        new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
+            new OutputDescriptor("dummy output class", null),
+            new InputDescriptor("dummy input class", null)));
     DAG dag = new DAG("testDag");
     dag.addVertex(v1);
     dag.addVertex(v2);
@@ -80,23 +109,43 @@ public class TestDAGVerify {
     System.out.println(ex.getMessage());
     Assert.assertTrue(ex.getMessage().startsWith("DAG contains a cycle"));
   }
-  
-//     v1 
-//      |     
-//    -> v2   
-//    ^  | | 
-//    v3    v4
+
+  //     v1
+  //      |
+  //    -> v2
+  //    ^  | |
+  //    v3    v4
   @Test
   public void testCycle2() {
     IllegalStateException ex=null;
-    Vertex v1 = new Vertex("v1", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor",
null), dummyTaskCount);
-    Vertex v2 = new Vertex("v2", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor",
null), dummyTaskCount);
-    Vertex v3 = new Vertex("v3", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor",
null), dummyTaskCount);
-    Vertex v4 = new Vertex("v4", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor",
null), dummyTaskCount);
-    Edge e1 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
new OutputDescriptor("dummy output class", null), new InputDescriptor("dummy input class",
null)));
-    Edge e2 = new Edge(v2, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
new OutputDescriptor("dummy output class", null), new InputDescriptor("dummy input class",
null)));
-    Edge e3 = new Edge(v2, v4, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
new OutputDescriptor("dummy output class", null), new InputDescriptor("dummy input class",
null)));
-    Edge e4 = new Edge(v3, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
new OutputDescriptor("dummy output class", null), new InputDescriptor("dummy input class",
null)));
+    Vertex v1 = new Vertex("v1",
+        new ProcessorDescriptor("MapProcessor", null),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v2 = new Vertex("v2",
+        new ProcessorDescriptor("MapProcessor", null),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v3 = new Vertex("v3",
+        new ProcessorDescriptor("MapProcessor", null),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v4 = new Vertex("v4",
+        new ProcessorDescriptor("MapProcessor", null),
+        dummyTaskCount, dummyTaskResource);
+    Edge e1 = new Edge(v1, v2,
+        new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
+            new OutputDescriptor("dummy output class", null),
+            new InputDescriptor("dummy input class", null)));
+    Edge e2 = new Edge(v2, v3,
+        new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
+            new OutputDescriptor("dummy output class", null),
+            new InputDescriptor("dummy input class", null)));
+    Edge e3 = new Edge(v2, v4,
+        new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
+            new OutputDescriptor("dummy output class", null),
+            new InputDescriptor("dummy input class", null)));
+    Edge e4 = new Edge(v3, v2,
+        new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
+            new OutputDescriptor("dummy output class", null),
+            new InputDescriptor("dummy input class", null)));
     DAG dag = new DAG("testDag");
     dag.addVertex(v1);
     dag.addVertex(v2);
@@ -116,12 +165,16 @@ public class TestDAGVerify {
     System.out.println(ex.getMessage());
     Assert.assertTrue(ex.getMessage().startsWith("DAG contains a cycle"));
   }
-  
+
   @Test
   public void repeatedVertexName() {
     IllegalStateException ex=null;
-    Vertex v1 = new Vertex("v1", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor",
null), dummyTaskCount);
-    Vertex v1repeat = new Vertex("v1", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor",
null), dummyTaskCount);
+    Vertex v1 = new Vertex("v1",
+        new ProcessorDescriptor("MapProcessor", null),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v1repeat = new Vertex("v1",
+        new ProcessorDescriptor("MapProcessor", null),
+        dummyTaskCount, dummyTaskResource);
     DAG dag = new DAG("testDag");
     dag.addVertex(v1);
     dag.addVertex(v1repeat);
@@ -135,19 +188,31 @@ public class TestDAGVerify {
     System.out.println(ex.getMessage());
     Assert.assertTrue(ex.getMessage().startsWith("DAG contains multiple vertices with name"));
   }
-  
+
   //  v1  v2
-  //   |  |     
-  //    v3   
+  //   |  |
+  //    v3
   @Test
   public void BinaryInput() {
     IllegalStateException ex=null;
     try {
-      Vertex v1 = new Vertex("v1", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor",
null), dummyTaskCount);
-      Vertex v2 = new Vertex("v2", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor",
null), dummyTaskCount);
-      Vertex v3 = new Vertex("v3", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor",
null), dummyTaskCount);
-      Edge e1 = new Edge(v1, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
new OutputDescriptor("dummy output class", null), new InputDescriptor("dummy input class",
null)));
-      Edge e2 = new Edge(v2, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
new OutputDescriptor("dummy output class", null), new InputDescriptor("dummy input class",
null)));
+      Vertex v1 = new Vertex("v1",
+          new ProcessorDescriptor("MapProcessor", null),
+          dummyTaskCount, dummyTaskResource);
+      Vertex v2 = new Vertex("v2",
+          new ProcessorDescriptor("MapProcessor", null),
+          dummyTaskCount, dummyTaskResource);
+      Vertex v3 = new Vertex("v3",
+          new ProcessorDescriptor("MapProcessor", null),
+          dummyTaskCount, dummyTaskResource);
+      Edge e1 = new Edge(v1, v3,
+          new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
+              new OutputDescriptor("dummy output class", null),
+              new InputDescriptor("dummy input class", null)));
+      Edge e2 = new Edge(v2, v3,
+          new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
+              new OutputDescriptor("dummy output class", null),
+              new InputDescriptor("dummy input class", null)));
       DAG dag = new DAG("testDag");
       dag.addVertex(v1);
       dag.addVertex(v2);
@@ -163,19 +228,31 @@ public class TestDAGVerify {
     System.out.println(ex.getMessage());
     Assert.assertTrue(ex.getMessage().startsWith("Vertex has inDegree>1"));
   }
-  
-  //   v1  
-  //  |  |     
-  //  v2  v3 
+
+  //   v1
+  //  |  |
+  //  v2  v3
   @Test
   public void BinaryOutput() {
     IllegalStateException ex=null;
     try {
-      Vertex v1 = new Vertex("v1", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor",
null), dummyTaskCount);
-      Vertex v2 = new Vertex("v2", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor",
null), dummyTaskCount);
-      Vertex v3 = new Vertex("v3", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor",
null), dummyTaskCount);
-      Edge e1 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
new OutputDescriptor("dummy output class", null), new InputDescriptor("dummy input class",
null)));
-      Edge e2 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
new OutputDescriptor("dummy output class", null), new InputDescriptor("dummy input class",
null)));
+      Vertex v1 = new Vertex("v1",
+          new ProcessorDescriptor("MapProcessor", null),
+          dummyTaskCount, dummyTaskResource);
+      Vertex v2 = new Vertex("v2",
+          new ProcessorDescriptor("MapProcessor", null),
+          dummyTaskCount, dummyTaskResource);
+      Vertex v3 = new Vertex("v3",
+          new ProcessorDescriptor("MapProcessor", null),
+          dummyTaskCount, dummyTaskResource);
+      Edge e1 = new Edge(v1, v2,
+          new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
+              new OutputDescriptor("dummy output class", null),
+              new InputDescriptor("dummy input class", null)));
+      Edge e2 = new Edge(v1, v2,
+          new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
+              new OutputDescriptor("dummy output class", null),
+              new InputDescriptor("dummy input class", null)));
       DAG dag = new DAG("testDag");
       dag.addVertex(v1);
       dag.addVertex(v2);
@@ -208,30 +285,25 @@ public class TestDAGVerify {
         .startsWith("Invalid dag containing 0 vertices"));
   }
 
+  @SuppressWarnings("unused")
   @Test
-  public void testVertexWithNoTasks() {
-    IllegalStateException ex=null;
+  public void testInvalidVertexConstruction() {
     try {
-      Vertex v1 = new Vertex("v1", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor",
null), dummyTaskCount);
-      Vertex v2 = new Vertex("v2", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor",
null), 0);
-      Vertex v3 = new Vertex("v3", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor",
null), dummyTaskCount);
-      Edge e1 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
new OutputDescriptor("dummy output class", null), new InputDescriptor("dummy input class",
null)));
-      Edge e2 = new Edge(v2, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
new OutputDescriptor("dummy output class", null), new InputDescriptor("dummy input class",
null)));
-      DAG dag = new DAG("testDag");
-      dag.addVertex(v1);
-      dag.addVertex(v2);
-      dag.addVertex(v3);
-      dag.addEdge(e1);
-      dag.addEdge(e2);
-      dag.verify();
+      Vertex v1 = new Vertex("v1",
+          new ProcessorDescriptor("MapProcessor", null),
+          0, dummyTaskResource);
+      Assert.fail("Expected exception for 0 parallelism");
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().startsWith("Parallelism cannot be 0"));
     }
-    catch (IllegalStateException e){
-      ex = e;
+    try {
+      Vertex v1 = new Vertex("v1",
+          new ProcessorDescriptor("MapProcessor", null),
+          1, null);
+      Assert.fail("Expected exception for 0 parallelism");
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().startsWith("Resource cannot be null"));
     }
-    Assert.assertNotNull(ex);
-    System.out.println(ex.getMessage());
-    Assert.assertTrue(ex.getMessage()
-        .startsWith("Vertex configured with 0 tasks"));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/901ecbe8/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index fa85616..8fe94d9 100644
--- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -220,13 +220,13 @@ public class TestMRRJobsDAGApi {
         IntWritable.class.getName());
     stage3Conf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR,
         NullOutputFormat.class.getName());
-    
+
     MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage1Conf, null);
     MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage2Conf,
         stage1Conf);
     MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage3Conf,
         stage2Conf);
-    
+
     MRHelpers.doJobClientMagic(stage1Conf);
     MRHelpers.doJobClientMagic(stage2Conf);
     MRHelpers.doJobClientMagic(stage3Conf);
@@ -240,13 +240,15 @@ public class TestMRRJobsDAGApi {
     Vertex stage1Vertex = new Vertex("map", new ProcessorDescriptor(
         MapProcessor.class.getName(),
         MRHelpers.createByteBufferFromConf(stage1Conf)),
-        inputSplitInfo.getNumTasks());
+        inputSplitInfo.getNumTasks(), Resource.newInstance(256, 1));
     Vertex stage2Vertex = new Vertex("ireduce", new ProcessorDescriptor(
         ReduceProcessor.class.getName(),
-        MRHelpers.createByteBufferFromConf(stage2Conf)), 1);
+        MRHelpers.createByteBufferFromConf(stage2Conf)),
+        1, Resource.newInstance(256, 1));
     Vertex stage3Vertex = new Vertex("reduce", new ProcessorDescriptor(
         ReduceProcessor.class.getName(),
-        MRHelpers.createByteBufferFromConf(stage3Conf)), 1);
+        MRHelpers.createByteBufferFromConf(stage3Conf)),
+        1, Resource.newInstance(256, 1));
 
     LocalResource appJarLr = createLocalResource(remoteFs,
         remoteFs.makeQualified(APP_JAR_HDFS), LocalResourceType.FILE,
@@ -277,19 +279,16 @@ public class TestMRRJobsDAGApi {
     stage1LocalResources.putAll(commonLocalResources);
 
     stage1Vertex.setJavaOpts(MRHelpers.getMapJavaOpts(stage1Conf));
-    stage1Vertex.setTaskResource(Resource.newInstance(256, 1));
     stage1Vertex.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
     stage1Vertex.setTaskLocalResources(stage1LocalResources);
     stage1Vertex.setTaskEnvironment(commonEnv);
     // TODO env, resources
 
     stage2Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(stage2Conf));
-    stage2Vertex.setTaskResource(Resource.newInstance(256, 1));
     stage2Vertex.setTaskLocalResources(commonLocalResources);
     stage2Vertex.setTaskEnvironment(commonEnv);
 
     stage3Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(stage3Conf));
-    stage3Vertex.setTaskResource(Resource.newInstance(256, 1));
     stage3Vertex.setTaskLocalResources(commonLocalResources);
     stage3Vertex.setTaskEnvironment(commonEnv);
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/901ecbe8/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
index 7624a9b..58e4c99 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
@@ -118,7 +118,7 @@ public class YARNRunner implements ClientProtocol {
   final public static FsPermission DAG_FILE_PERMISSION =
       FsPermission.createImmutable((short) 0644);
   final public static int UTF8_CHUNK_SIZE = 16 * 1024;
-  
+
   private final TezConfiguration tezConf;
   private final TezClient tezClient;
   private DAGClient dagClient;
@@ -158,7 +158,7 @@ public class YARNRunner implements ClientProtocol {
       this.tezClient = new TezClient(tezConf);
       this.clientCache = clientCache;
       this.defaultFileContext = FileContext.getFileContext(this.conf);
-      
+
     } catch (UnsupportedFileSystemException ufe) {
       throw new RuntimeException("Error in instantiating YarnClient", ufe);
     }
@@ -392,15 +392,17 @@ public class YARNRunner implements ClientProtocol {
       }
     }
 
+    Resource taskResource = isMap ? MRHelpers.getMapResource(stageConf)
+        : MRHelpers.getReduceResource(stageConf);
     Vertex vertex = new Vertex(vertexName, new ProcessorDescriptor(
-        processorName, MRHelpers.createByteBufferFromConf(stageConf)), numTasks);
+        processorName, MRHelpers.createByteBufferFromConf(stageConf)),
+        numTasks, taskResource);
 
     Map<String, String> taskEnv = new HashMap<String, String>();
     setupMapReduceEnv(stageConf, taskEnv, isMap);
-    Resource taskResource = isMap ? MRHelpers.getMapResource(stageConf)
-        : MRHelpers.getReduceResource(stageConf);
 
-    Map<String, LocalResource> taskLocalResources = new TreeMap<String, LocalResource>();
+    Map<String, LocalResource> taskLocalResources =
+        new TreeMap<String, LocalResource>();
     // PRECOMMIT Remove split localization for reduce tasks if it's being set
     // here
     taskLocalResources.putAll(jobLocalResources);
@@ -410,7 +412,7 @@ public class YARNRunner implements ClientProtocol {
 
     vertex.setTaskEnvironment(taskEnv)
         .setTaskLocalResources(taskLocalResources)
-        .setTaskLocationsHint(locations).setTaskResource(taskResource)
+        .setTaskLocationsHint(locations)
         .setJavaOpts(taskJavaOpts);
 
     if (LOG.isDebugEnabled()) {
@@ -487,7 +489,7 @@ public class YARNRunner implements ClientProtocol {
   public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
   throws IOException, InterruptedException {
 
-    // TEZ-192 - stop using token file 
+    // TEZ-192 - stop using token file
     // Upload only in security mode: TODO
     Path applicationTokensFile =
         new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
@@ -498,11 +500,11 @@ public class YARNRunner implements ClientProtocol {
     }
 
     ApplicationId appId = resMgrDelegate.getApplicationId();
-    
+
     FileSystem fs = FileSystem.get(conf);
     // Loads the job.xml written by the user.
     JobConf jobConf = new JobConf(new TezConfiguration(conf));
-    
+
     // Extract individual raw MR configs.
     Configuration[] stageConfs = MultiStageMRConfToTezTranslator
         .getStageConfs(jobConf);
@@ -516,7 +518,7 @@ public class YARNRunner implements ClientProtocol {
     }
 
     // create inputs to tezClient.submit()
-    
+
     // FIXME set up job resources
     Map<String, LocalResource> jobLocalResources =
         createJobLocalResources(stageConfs[0], jobSubmitDir);
@@ -555,13 +557,13 @@ public class YARNRunner implements ClientProtocol {
       Path appStagingDir = fs.resolvePath(new Path(jobSubmitDir));
       dagClient = tezClient.submitDAGApplication(
           appId,
-          dag, 
-          appStagingDir, 
+          dag,
+          appStagingDir,
           ts,
           jobConf.get(JobContext.QUEUE_NAME,
               YarnConfiguration.DEFAULT_QUEUE_NAME),
-          vargs, 
-          environment, 
+          vargs,
+          environment,
           jobLocalResources, dagAMConf);
 
     } catch (TezException e) {
@@ -653,7 +655,7 @@ public class YARNRunner implements ClientProtocol {
   public void killJob(JobID arg0) throws IOException, InterruptedException {
     /* check if the status is not running, if not send kill to RM */
     JobStatus status = clientCache.getClient(arg0).getJobStatus(arg0);
-    if (status.getState() == JobStatus.State.RUNNING || 
+    if (status.getState() == JobStatus.State.RUNNING ||
         status.getState() == JobStatus.State.PREP) {
       try {
         resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());


Mime
View raw message