tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject git commit: TEZ-169. Tez should not allow a 0-vertex dag or dags with vertices that have 0 tasks. (hitesh)
Date Fri, 21 Jun 2013 17:20:07 GMT
Updated Branches:
  refs/heads/master 4a388b42a -> efacbb5d2


TEZ-169. Tez should not allow a 0-vertex dag or dags with vertices that have 0 tasks. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/efacbb5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/efacbb5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/efacbb5d

Branch: refs/heads/master
Commit: efacbb5d2bd45d8e51b54eccb90314a835af4263
Parents: 4a388b4
Author: Hitesh Shah <hitesh@apache.org>
Authored: Fri Jun 21 10:18:59 2013 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Fri Jun 21 10:18:59 2013 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/tez/dag/api/DAG.java   | 13 ++-
 .../org/apache/tez/dag/api/TestDAGVerify.java   | 43 ++++++++++
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    | 83 ++------------------
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 28 +------
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 58 +++++++++++---
 .../processor/reduce/ReduceProcessor.java       |  3 -
 6 files changed, 111 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/efacbb5d/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 180a368..58ac804 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -138,7 +138,11 @@ public class DAG { // FIXME rename to Topology
     verify(true);
   }
   
-  public void verify(boolean restricted) throws IllegalStateException  { 
+  public void verify(boolean restricted) throws IllegalStateException  {
+    if (vertices.isEmpty()) {
+      throw new IllegalStateException("Invalid dag containing 0 vertices");
+    }
+
     Map<Vertex, List<Edge>> edgeMap = new HashMap<Vertex, List<Edge>>();
     for(Edge e : edges){
       Vertex inputVertex = e.getInputVertex();
@@ -154,7 +158,12 @@ public class DAG { // FIXME rename to Topology
     Map<String, AnnotatedVertex> vertexMap = new HashMap<String, AnnotatedVertex>();
     for(Vertex v : vertices){
       if(vertexMap.containsKey(v.getVertexName())){
-         throw new IllegalStateException("DAG contains multiple vertices with name: " + v.getVertexName());
+         throw new IllegalStateException("DAG contains multiple vertices"
+             + " with name: " + v.getVertexName());
+      }
+      if (v.getParallelism() == 0) {
+        throw new IllegalStateException("Vertex configured with 0 tasks"
+            + ", vertexName=" + v.getVertexName());
       }
       vertexMap.put(v.getVertexName(), new AnnotatedVertex(v));
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/efacbb5d/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index 6134f42..eaf8331 100644
--- a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -191,4 +191,47 @@ public class TestDAGVerify {
     System.out.println(ex.getMessage());
     Assert.assertTrue(ex.getMessage().startsWith("Vertex has outDegree>1"));
   }
+
+  @Test
+  public void testDagWithNoVertices() {
+    IllegalStateException ex=null;
+    try {
+      DAG dag = new DAG("testDag");
+      dag.verify();
+    }
+    catch (IllegalStateException e){
+      ex = e;
+    }
+    Assert.assertNotNull(ex);
+    System.out.println(ex.getMessage());
+    Assert.assertTrue(ex.getMessage()
+        .startsWith("Invalid dag containing 0 vertices"));
+  }
+
+  @Test
+  public void testVertexWithNoTasks() {
+    IllegalStateException ex=null;
+    try {
+      Vertex v1 = new Vertex("v1", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor",
null), dummyTaskCount);
+      Vertex v2 = new Vertex("v2", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor",
null), 0);
+      Vertex v3 = new Vertex("v3", new ProcessorDescriptor("org.apache.tez.mapreduce.processor.reduce.MapProcessor",
null), dummyTaskCount);
+      Edge e1 = new Edge(v1, v2, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
new OutputDescriptor("dummy output class", null), new InputDescriptor("dummy input class",
null)));
+      Edge e2 = new Edge(v2, v3, new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
new OutputDescriptor("dummy output class", null), new InputDescriptor("dummy input class",
null)));
+      DAG dag = new DAG("testDag");
+      dag.addVertex(v1);
+      dag.addVertex(v2);
+      dag.addVertex(v3);
+      dag.addEdge(e1);
+      dag.addEdge(e2);
+      dag.verify();
+    }
+    catch (IllegalStateException e){
+      ex = e;
+    }
+    Assert.assertNotNull(ex);
+    System.out.println(ex.getMessage());
+    Assert.assertTrue(ex.getMessage()
+        .startsWith("Vertex configured with 0 tasks"));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/efacbb5d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 837a8b1..9c33523 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -60,6 +60,7 @@ import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.DAGStatusBuilder;
 import org.apache.tez.dag.api.client.ProgressBuilder;
+import org.apache.tez.dag.api.client.VertexStatus;
 import org.apache.tez.dag.api.client.VertexStatusBuilder;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
@@ -204,11 +205,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
               EnumSet.of(DAGState.RUNNING, DAGState.SUCCEEDED, DAGState.FAILED),
               DAGEventType.DAG_VERTEX_COMPLETED,
               new VertexCompletedTransition())
-          .addTransition
-              (DAGState.RUNNING,
-              EnumSet.of(DAGState.RUNNING, DAGState.SUCCEEDED, DAGState.FAILED),
-              DAGEventType.DAG_COMPLETED,
-              new JobNoTasksCompletedTransition())
           .addTransition(DAGState.RUNNING, DAGState.KILL_WAIT,
               DAGEventType.DAG_KILL, new KillVerticesTransition())
           .addTransition(DAGState.RUNNING, DAGState.RUNNING,
@@ -786,45 +782,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   }
   */
 
-//  /**
-//   * ChainMapper and ChainReducer must execute in parallel, so they're not
-//   * compatible with uberization/LocalContainerLauncher (100% sequential).
-//   */
-//  private boolean isChainJob(Configuration conf) {
-//    boolean isChainJob = false;
-//    try {
-//      String mapClassName = conf.get(MRJobConfig.MAP_CLASS_ATTR);
-//      if (mapClassName != null) {
-//        Class<?> mapClass = Class.forName(mapClassName);
-//        if (ChainMapper.class.isAssignableFrom(mapClass))
-//          isChainJob = true;
-//      }
-//    } catch (ClassNotFoundException cnfe) {
-//      // don't care; assume it's not derived from ChainMapper
-//    }
-//    try {
-//      String reduceClassName = conf.get(MRJobConfig.REDUCE_CLASS_ATTR);
-//      if (reduceClassName != null) {
-//        Class<?> reduceClass = Class.forName(reduceClassName);
-//        if (ChainReducer.class.isAssignableFrom(reduceClass))
-//          isChainJob = true;
-//      }
-//    } catch (ClassNotFoundException cnfe) {
-//      // don't care; assume it's not derived from ChainReducer
-//    }
-//    return isChainJob;
-//  }
-
-  /*
-  private int getBlockSize() {
-    String inputClassName = conf.get(MRJobConfig.INPUT_FORMAT_CLASS_ATTR);
-    if (inputClassName != null) {
-      Class<?> inputClass - Class.forName(inputClassName);
-      if (FileInputFormat<K, V>)
-    }
-  }
-  */
-
   public static class InitTransition
       implements MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
 
@@ -844,25 +801,17 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         setup(dag);
         dag.fs = dag.getFileSystem(dag.conf);
 
-        checkTaskLimits();
-
-        // TODO: Committer
-        /*
-        if (dag.newApiCommitter) {
-          dag.dagContext = new JobContextImpl(dag.conf,
-              dag.oldJobId);
-        } else {
-          dag.dagContext = new org.apache.hadoop.mapred.JobContextImpl(
-              dag.conf, dag.oldJobId);
+        // If we have no vertices, fail the dag
+        dag.numVertices = dag.getJobPlan().getVertexCount();
+        if (dag.numVertices == 0) {
+          dag.addDiagnostic("No vertices for dag");
+          dag.abortJob(DAGStatus.State.FAILED);
+          return dag.finished(DAGState.FAILED);
         }
 
-        // do the setup
-        dag.committer.setupJob(dag.dagContext);
-        dag.setupProgress = 1.0f;
-        */
+        checkTaskLimits();
 
         // create the vertices
-        dag.numVertices = dag.getJobPlan().getVertexCount();
         for (int i=0; i < dag.numVertices; ++i) {
           String vertexName = dag.getJobPlan().getVertex(i).getName();
           VertexImpl v = createVertex(dag, vertexName, i);
@@ -1033,12 +982,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       // TODO Metrics
       //job.metrics.runningJob(job);
 
-			// If we have no tasks, just transition to job completed
-      if (job.numVertices == 0) {
-        job.eventHandler.handle(
-            new DAGEvent(job.dagId, DAGEventType.DAG_COMPLETED));
-      }
-
       // Start all vertices with no incoming edges when job starts
       job.startRootVertices();
     }
@@ -1172,16 +1115,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     }
   }
 
-  // Transition class for handling jobs with no vertices
-  static class JobNoTasksCompletedTransition implements
-  MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
-
-    @Override
-    public DAGState transition(DAGImpl dag, DAGEvent event) {
-      return checkJobForCompletion(dag);
-    }
-  }
-
   private void addDiagnostic(String diag) {
     diagnostics.add(diag);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/efacbb5d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 4fb41f2..a65962f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -228,12 +228,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexState.SUCCEEDED, VertexState.FAILED),
               VertexEventType.V_TASK_COMPLETED,
               new TaskCompletedTransition())
-          .addTransition
-              (VertexState.RUNNING,
-              EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED,
-                  VertexState.FAILED),
-              VertexEventType.V_COMPLETED,
-              new VertexNoTasksCompletedTransition())
           .addTransition(VertexState.RUNNING, VertexState.KILL_WAIT,
               VertexEventType.V_KILL, new KillTasksTransition())
           .addTransition(VertexState.RUNNING, VertexState.RUNNING,
@@ -815,7 +809,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         */
 
         if (vertex.numTasks == 0) {
-          vertex.addDiagnostic("No of tasks for vertex " + vertex.getVertexId());
+          vertex.addDiagnostic("No tasks for vertex " + vertex.getVertexId());
+          vertex.abortVertex(VertexStatus.State.FAILED);
+          return vertex.finished(VertexState.FAILED);
         }
 
         checkTaskLimits();
@@ -827,8 +823,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         // create the Tasks but don't start them yet
         createTasks(vertex);
 
-
-
         boolean hasBipartite = false;
         if (vertex.sourceVertices != null) {
           for (EdgeProperty edgeProperty : vertex.sourceVertices.values()) {
@@ -982,11 +976,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                                                vertex.distanceFromRoot));
       }
 
-      // If we have no tasks, just transition to vertex completed
-      if (vertex.numTasks == 0) {
-        vertex.eventHandler.handle(
-            new VertexEvent(vertex.vertexId, VertexEventType.V_COMPLETED));
-      }
     }
   }
 
@@ -1199,17 +1188,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
   }
 
-  // Transition class for handling jobs with no tasks
-  // TODODAGAM - is this allowed for a vertex?
-  static class VertexNoTasksCompletedTransition implements
-  MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
-
-    @Override
-    public VertexState transition(VertexImpl vertex, VertexEvent event) {
-      return VertexImpl.checkVertexForCompletion(vertex);
-    }
-  }
-
   private static class TaskRescheduledTransition implements
       SingleArcTransition<VertexImpl, VertexEvent> {
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/efacbb5d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index ac0ea8d..10e92d5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -192,6 +192,35 @@ public class TestVertexImpl {
     }
   }
 
+  private DAGPlan createInvalidDAGPlan() {
+    LOG.info("Setting up invalid dag plan");
+    DAGPlan dag = DAGPlan.newBuilder()
+        .setName("testverteximplinvalid")
+        .addVertex(
+            VertexPlan.newBuilder()
+            .setName("vertex1")
+            .setType(PlanVertexType.NORMAL)
+            .addTaskLocationHint(
+                PlanTaskLocationHint.newBuilder()
+                .addHost("host1")
+                .addRack("rack1")
+                .build()
+            )
+        .setTaskConfig(
+            PlanTaskConfiguration.newBuilder()
+            .setNumTasks(0)
+            .setVirtualCores(4)
+            .setMemoryMb(1024)
+            .setJavaOpts("")
+            .setTaskModule("x1.y1")
+            .build()
+            )
+        .addOutEdgeId("e1")
+        .build()
+        )
+        .build();
+    return dag;
+  }
 
   private DAGPlan createTestDAGPlan() {
     LOG.info("Setting up dag plan");
@@ -209,7 +238,7 @@ public class TestVertexImpl {
                 )
             .setTaskConfig(
                 PlanTaskConfiguration.newBuilder()
-                .setNumTasks(0)
+                .setNumTasks(1)
                 .setVirtualCores(4)
                 .setMemoryMb(1024)
                 .setJavaOpts("")
@@ -491,6 +520,7 @@ public class TestVertexImpl {
     dispatcher.register(TaskEventType.class, new TaskEventHandler());
     dispatcher.init(conf);
     dispatcher.start();
+
   }
 
   @After
@@ -683,17 +713,6 @@ public class TestVertexImpl {
   }
 
   @Test(timeout = 5000)
-  public void testVertexWithNoTasks() {
-    // FIXME a vertex with no tasks should not be allowed
-    initAllVertices();
-
-    VertexImpl v = vertices.get("vertex1");
-    startVertex(v, false);
-    dispatcher.await();
-    Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
-  }
-
-  @Test(timeout = 5000)
   public void testVertexKillDiagnostics() {
     VertexImpl v1 = vertices.get("vertex1");
     killVertex(v1, false);
@@ -1104,4 +1123,19 @@ public class TestVertexImpl {
         dagEventDispatcher.eventCount.get(
             DAGEventType.INTERNAL_ERROR).intValue());
   }
+
+  @Test
+  public void testVertexWithNoTasks() {
+    TezDAGID invalidDagId = new TezDAGID(
+        dagId.getApplicationId(), 1000);
+    DAGPlan dPlan = createInvalidDAGPlan();
+    TezVertexID vId = new TezVertexID(invalidDagId, 1);
+    VertexPlan vPlan = dPlan.getVertex(0);
+    VertexImpl v = new VertexImpl(vId, vPlan, vPlan.getName(), conf,
+        dispatcher.getEventHandler(), taskAttemptListener, jobToken, fsTokens,
+        clock, thh, appContext, vertexLocationHint);
+    v.handle(new VertexEvent(vId, VertexEventType.V_INIT));
+    Assert.assertEquals(VertexState.FAILED, v.getState());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/efacbb5d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index de925b3..e8054d2 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -66,7 +66,6 @@ implements Processor {
 
   private Counter reduceInputKeyCounter;
   private Counter reduceInputValueCounter;
-  private int numMapTasks;
 
   public ReduceProcessor(TezEngineTaskContext context) {
     super(context);
@@ -77,8 +76,6 @@ implements Processor {
         tezEngineContext.getInputSpecList().size() == 1,
         "Expected exactly one input, found : "
             + tezEngineContext.getInputSpecList().size());
-    this.numMapTasks = tezEngineContext.getInputSpecList().get(0)
-        .getNumInputs();
   }
   
   @Override


Mime
View raw message