tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject git commit: TEZ-202. DAGSchedulerMRR does not handle failed vertices properly. (hitesh)
Date Thu, 13 Jun 2013 01:09:22 GMT
Updated Branches:
  refs/heads/master ba02c2721 -> ff5a30231


TEZ-202. DAGSchedulerMRR does not handle failed vertices properly. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/ff5a3023
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/ff5a3023
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/ff5a3023

Branch: refs/heads/master
Commit: ff5a3023129dcf05d884152f97f05ffe617b446b
Parents: ba02c27
Author: Hitesh Shah <hitesh@apache.org>
Authored: Wed Jun 12 18:07:51 2013 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Wed Jun 12 18:07:51 2013 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |   8 +-
 .../tez/dag/app/dag/impl/TestDAGImpl.java       | 167 ++++++++++++++++++-
 2 files changed, 163 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ff5a3023/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 8f49f3d..d69e4a0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -93,6 +93,8 @@ import org.apache.tez.engine.common.security.JobTokenSecretManager;
 import org.apache.tez.engine.common.security.TokenCache;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /** Implementation of Job interface. Maintains the state machines of Job.
  * The read and write calls use ReadWriteLock for concurrency.
  */
@@ -119,7 +121,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   private final TaskHeartbeatHandler taskHeartbeatHandler;
   private final Object tasksSyncHandle = new Object();
 
-  private DAGScheduler dagScheduler;
+  @VisibleForTesting
+  DAGScheduler dagScheduler;
 
   private final EventHandler eventHandler;
   // TODO Metrics
@@ -1120,6 +1123,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       job.numCompletedVertices++;
       if (vertexEvent.getVertexState() == VertexState.SUCCEEDED) {
         vertexSucceeded(job, vertex);
+        job.dagScheduler.vertexCompleted(vertex);
       } else if (vertexEvent.getVertexState() == VertexState.FAILED) {
         vertexFailed(job, vertex);
       } else if (vertexEvent.getVertexState() == VertexState.KILLED) {
@@ -1135,8 +1139,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
             + ", numVertices=" + job.numVertices);
       }
 
-      job.dagScheduler.vertexCompleted(vertex);
-
       return checkJobForCompletion(job);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ff5a3023/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 7ed2b8b..a8dd3c8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -45,9 +45,12 @@ import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
+import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate.UpdateType;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
 import org.apache.tez.dag.app.dag.event.DAGFinishEvent;
@@ -86,11 +89,22 @@ public class TestDAGImpl {
   private Clock clock = new SystemClock();
   private JobTokenSecretManager jobTokenSecretManager;
   private DAGFinishEventHandler dagFinishEventHandler;
+  private AppContext mrrAppContext;
+  private DAGPlan mrrDagPlan;
+  private DAGImpl mrrDag;
+  private TezDAGID mrrDagId;
 
   private class DagEventDispatcher implements EventHandler<DAGEvent> {
     @Override
     public void handle(DAGEvent event) {
-      dag.handle(event);
+      if (event.getDAGId().equals(dagId)) {
+        dag.handle(event);
+      }  else if (event.getDAGId().equals(mrrDagId)) {
+        mrrDag.handle(event);
+      } else {
+        throw new RuntimeException("Invalid event, unknown dag"
+            + ", dagId=" + event.getDAGId());
+      }
     }
   }
 
@@ -112,7 +126,9 @@ public class TestDAGImpl {
     @SuppressWarnings("unchecked")
     @Override
     public void handle(VertexEvent event) {
-      Vertex vertex = dag.getVertex(event.getVertexId());
+      DAGImpl handler = event.getVertexId().getDAGId().equals(dagId) ?
+          dag : mrrDag;
+      Vertex vertex = handler.getVertex(event.getVertexId());
       ((EventHandler<VertexEvent>) vertex).handle(event);
     }
   }
@@ -127,6 +143,105 @@ public class TestDAGImpl {
     }
   }
 
+  private DAGPlan createTestMRRDAGPlan() {
+    LOG.info("Setting up MRR dag plan");
+    DAGPlan dag = DAGPlan.newBuilder()
+        .setName("testverteximpl")
+        .addVertex(
+            VertexPlan.newBuilder()
+            .setName("vertex1")
+            .setType(PlanVertexType.NORMAL)
+            .addTaskLocationHint(
+                PlanTaskLocationHint.newBuilder()
+                .addHost("host1")
+                .addRack("rack1")
+                .build()
+                )
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                    .setNumTasks(1)
+                    .setVirtualCores(4)
+                    .setMemoryMb(1024)
+                    .setJavaOpts("")
+                    .setTaskModule("x1.y1")
+                    .build()
+                    )
+                    .addOutEdgeId("e1")
+                    .build()
+            )
+        .addVertex(
+            VertexPlan.newBuilder()
+            .setName("vertex2")
+            .setType(PlanVertexType.NORMAL)
+            .addTaskLocationHint(
+                PlanTaskLocationHint.newBuilder()
+                .addHost("host2")
+                .addRack("rack2")
+                .build()
+                )
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                    .setNumTasks(2)
+                    .setVirtualCores(4)
+                    .setMemoryMb(1024)
+                    .setJavaOpts("")
+                    .setTaskModule("x2.y2")
+                    .build()
+                    )
+                    .addInEdgeId("e1")
+                    .addOutEdgeId("e2")
+                    .build()
+            )
+        .addVertex(
+            VertexPlan.newBuilder()
+            .setName("vertex3")
+            .setType(PlanVertexType.NORMAL)
+            .setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("x3.y3"))
+            .addTaskLocationHint(
+                PlanTaskLocationHint.newBuilder()
+                .addHost("host3")
+                .addRack("rack3")
+                .build()
+                )
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                    .setNumTasks(2)
+                    .setVirtualCores(4)
+                    .setMemoryMb(1024)
+                    .setJavaOpts("foo")
+                    .setTaskModule("x3.y3")
+                    .build()
+                    )
+                    .addInEdgeId("e2")
+                    .build()
+            )
+        .addEdge(
+            EdgePlan.newBuilder()
+            .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("i2"))
+            .setInputVertexName("vertex1")
+            .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o1"))
+            .setOutputVertexName("vertex2")
+            .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE)
+            .setId("e1")
+            .setSourceType(PlanEdgeSourceType.STABLE)
+            .build()
+            )
+       .addEdge(
+           EdgePlan.newBuilder()
+           .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("i3"))
+           .setInputVertexName("vertex2")
+           .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+           .setOutputVertexName("vertex3")
+           .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE)
+           .setId("e2")
+           .setSourceType(PlanEdgeSourceType.STABLE)
+           .build()
+           )
+      .build();
+
+    return dag;
+  }
+
   private DAGPlan createTestDAGPlan() {
     LOG.info("Setting up dag plan");
     DAGPlan dag = DAGPlan.newBuilder()
@@ -359,6 +474,15 @@ public class TestDAGImpl {
         dispatcher.getEventHandler(),  taskAttemptListener,
         jobTokenSecretManager, fsTokens, clock, "user", 10000, thh, appContext);
     doReturn(dag).when(appContext).getDAG();
+    mrrAppContext = mock(AppContext.class);
+    mrrDagId = new TezDAGID(appAttemptId.getApplicationId(), 2);
+    mrrDagPlan = createTestMRRDAGPlan();
+    mrrDag = new DAGImpl(mrrDagId, appAttemptId, conf, mrrDagPlan,
+        dispatcher.getEventHandler(),  taskAttemptListener,
+        jobTokenSecretManager, fsTokens, clock, "user", 10000, thh,
+        mrrAppContext);
+    doReturn(mrrDag).when(mrrAppContext).getDAG();
+    doReturn(appAttemptId).when(mrrAppContext).getApplicationAttemptId();
     vertexEventDispatcher = new VertexEventDispatcher();
     dispatcher.register(VertexEventType.class, vertexEventDispatcher);
     dagEventDispatcher = new DagEventDispatcher();
@@ -380,18 +504,18 @@ public class TestDAGImpl {
     dag = null;
   }
 
-  private void initDAG(DAGImpl dag) {
-    dag.handle(
-        new DAGEvent(dagId, DAGEventType.DAG_INIT));
-    Assert.assertEquals(DAGState.INITED, dag.getState());
+  private void initDAG(DAGImpl impl) {
+    impl.handle(
+        new DAGEvent(impl.getID(), DAGEventType.DAG_INIT));
+    Assert.assertEquals(DAGState.INITED, impl.getState());
   }
 
   @SuppressWarnings("unchecked")
-  private void startDAG(DAGImpl dag) {
+  private void startDAG(DAGImpl impl) {
     dispatcher.getEventHandler().handle(
-        new DAGEvent(dagId, DAGEventType.DAG_START));
+        new DAGEvent(impl.getID(), DAGEventType.DAG_START));
     dispatcher.await();
-    Assert.assertEquals(DAGState.RUNNING, dag.getState());
+    Assert.assertEquals(DAGState.RUNNING, impl.getState());
   }
 
   @Test
@@ -638,4 +762,29 @@ public class TestDAGImpl {
   public void testCounterUpdates() {
     // FIXME need to implement
   }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testOutofBandFailureForMRRScheduler() {
+    initDAG(mrrDag);
+    dispatcher.await();
+    Assert.assertTrue(mrrDag.dagScheduler instanceof DAGSchedulerMRR);
+    startDAG(mrrDag);
+    dispatcher.await();
+
+    TaskAttempt attempt = mock(TaskAttempt.class);
+    doReturn(
+        mrrDag.getVertex("vertex1").getVertexId()).when(attempt).getVertexID();
+
+    DAGEventSchedulerUpdate scheduleEvent =
+        new DAGEventSchedulerUpdate(UpdateType.TA_SCHEDULE, attempt);
+    dispatcher.getEventHandler().handle(scheduleEvent);
+    dispatcher.await();
+
+    dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
+        mrrDag.getVertex("vertex2").getVertexId(), VertexState.FAILED));
+    dispatcher.await();
+
+    Assert.assertEquals(DAGState.FAILED, mrrDag.getState());
+  }
 }


Mime
View raw message