tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [12/50] [abbrv] git commit: TEZ-109. Create tests for VertexImpl.
Date Tue, 04 Jun 2013 05:33:14 GMT
TEZ-109. Create tests for VertexImpl.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/38b410e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/38b410e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/38b410e3

Branch: refs/heads/master
Commit: 38b410e3116919d31fbb9bab2ba4cc7c21cc9fee
Parents: 645c470
Author: Hitesh Shah <hitesh@apache.org>
Authored: Thu May 16 12:35:12 2013 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Thu May 16 12:35:12 2013 -0700

----------------------------------------------------------------------
 pom.xml                                            |    1 +
 .../apache/tez/dag/app/dag/impl/VertexImpl.java    |   70 +-
 .../tez/dag/app/dag/impl/TestVertexImpl.java       |  950 +++++++++++++++
 3 files changed, 1000 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/38b410e3/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5c545c5..7fa336a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -294,6 +294,7 @@
         <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-surefire-plugin</artifactId>
+          <version>2.14.1</version>
           <configuration>
             <forkMode>always</forkMode>
             <forkedProcessTimeoutInSeconds>900</forkedProcessTimeoutInSeconds>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/38b410e3/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 3f3ac49..aedb386 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -101,6 +101,8 @@ import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
 
+import com.google.common.annotations.VisibleForTesting;
+
 
 /** Implementation of Vertex interface. Maintains the state machines of Vertex.
  * The read and write calls use ReadWriteLock for concurrency.
@@ -112,7 +114,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private static final String LINE_SEPARATOR = System
       .getProperty("line.separator");
   private static final TezDependentTaskCompletionEvent[]
-    EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS = new TezDependentTaskCompletionEvent[0];
+      EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS =
+      new TezDependentTaskCompletionEvent[0];
 
   private static final Log LOG = LogFactory.getLog(VertexImpl.class);
 
@@ -154,7 +157,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private final List<String> diagnostics = new ArrayList<String>();
 
   //task/attempt related datastructures
-  private final Map<TezTaskID, Integer> successSourceAttemptCompletionEventNoMap =
+  @VisibleForTesting
+  final Map<TezTaskID, Integer> successSourceAttemptCompletionEventNoMap =
     new HashMap<TezTaskID, Integer>();
   private final Map<TezTaskAttemptID, Integer> fetchFailuresMapping =
     new HashMap<TezTaskAttemptID, Integer>();
@@ -214,12 +218,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
           .addTransition
               (VertexState.RUNNING,
-              EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.FAILED),
+              EnumSet.of(VertexState.RUNNING, VertexState.KILLED,
+                  VertexState.SUCCEEDED, VertexState.FAILED),
               VertexEventType.V_TASK_COMPLETED,
               new TaskCompletedTransition())
           .addTransition
               (VertexState.RUNNING,
-              EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.FAILED),
+              EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED,
+                  VertexState.FAILED),
               VertexEventType.V_COMPLETED,
               new VertexNoTasksCompletedTransition())
           .addTransition(VertexState.RUNNING, VertexState.KILL_WAIT,
@@ -308,7 +314,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           // create the topology tables
           .installTopology();
 
-  private final StateMachine<VertexState, VertexEventType, VertexEvent> stateMachine;
+  private final StateMachine<VertexState, VertexEventType, VertexEvent>
+      stateMachine;
 
   //changing fields while the vertex is running
   private int numTasks;
@@ -341,10 +348,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private VertexLocationHint vertexLocationHint;
   private Map<String, LocalResource> localResources;
   private Map<String, String> environment;
-  private String javaOpts;
+  private final String javaOpts;
   
-  public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan, String vertexName,
-      TezConfiguration conf, EventHandler eventHandler,
+  public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan, 
+      String vertexName, TezConfiguration conf, EventHandler eventHandler,
       TaskAttemptListener taskAttemptListener,
       Token<JobTokenIdentifier> jobToken,
       Credentials fsTokenCredentials, Clock clock,
@@ -573,9 +580,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       try {
          getStateMachine().doTransition(event.getType(), event);
       } catch (InvalidStateTransitonException e) {
-        LOG.error("Can't handle this event at current state", e);
-        addDiagnostic("Invalid event " + event.getType() +
-            " on Job " + this.vertexId);
+        String message = "Invalid event " + event.getType() +
+            " on vertex " + this.vertexId +
+            " at current state " + oldState;
+        LOG.error("Can't handle " + message, e);
+        addDiagnostic(message);
         eventHandler.handle(new VertexEvent(this.vertexId,
             VertexEventType.INTERNAL_ERROR));
       }
@@ -957,6 +966,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     @Override
     public void transition(VertexImpl vertex, VertexEvent event) {
       vertex.setFinishTime();
+      vertex.addDiagnostic("Vertex received Kill in NEW state.");
       vertex.finished(VertexState.KILLED);
     }
   }
@@ -966,7 +976,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     @Override
     public void transition(VertexImpl vertex, VertexEvent event) {
       vertex.abortVertex(VertexStatus.State.KILLED);
-      vertex.addDiagnostic("Job received Kill in INITED state.");
+      vertex.addDiagnostic("Vertex received Kill in INITED state.");
       vertex.finished(VertexState.KILLED);
     }
   }
@@ -975,7 +985,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       implements SingleArcTransition<VertexImpl, VertexEvent> {
     @Override
     public void transition(VertexImpl vertex, VertexEvent event) {
-      vertex.addDiagnostic("Job received Kill while in RUNNING state.");
+      vertex.addDiagnostic("Vertex received Kill while in RUNNING state.");
       for (Task task : vertex.tasks.values()) {
         vertex.eventHandler.handle(
             new TaskEvent(task.getTaskId(), TaskEventType.T_KILL));
@@ -1133,14 +1143,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
     @Override
     public VertexState transition(VertexImpl vertex, VertexEvent event) {
-      VertexState vertexCompleteSuccess =
-          VertexImpl.checkVertexForCompletion(vertex);
-      if (vertexCompleteSuccess != null) {
-        return vertexCompleteSuccess;
-      }
-
-      // Return the current state, Job not finished yet
-      return vertex.getInternalState();
+      return VertexImpl.checkVertexForCompletion(vertex);
     }
   }
 
@@ -1226,6 +1229,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     return appContext.getDAG();
   }
   
+  @VisibleForTesting
+  String getProcessorName() {
+    return this.processorName;
+  }
+
+  @VisibleForTesting
+  String getJavaOpts() {
+    return this.javaOpts;
+  }
+
   // TODO Eventually remove synchronization.
   @Override
   public synchronized List<InputSpec> getInputSpecList() {
@@ -1261,4 +1274,19 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     return outputSpecList;
   }
 
+  @VisibleForTesting
+  VertexOutputCommitter getVertexOutputCommitter() {
+    return this.committer;
+  }
+
+  @VisibleForTesting
+  // Only to be used for testing
+  void setVertexOutputCommitter(VertexOutputCommitter committer) {
+    this.committer = committer;
+  }
+
+  @VisibleForTesting
+  VertexScheduler getVertexScheduler() {
+    return this.vertexScheduler;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/38b410e3/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
new file mode 100644
index 0000000..a1e450c
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -0,0 +1,950 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.MRVertexOutputCommitter;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.tez.dag.api.DAGProtos.DAGPlan;
+import org.apache.tez.dag.api.DAGProtos.EdgePlan;
+import org.apache.tez.dag.api.DAGProtos.PlanEdgeConnectionPattern;
+import org.apache.tez.dag.api.DAGProtos.PlanEdgeSourceType;
+import org.apache.tez.dag.api.DAGProtos.PlanTaskConfiguration;
+import org.apache.tez.dag.api.DAGProtos.PlanTaskLocationHint;
+import org.apache.tez.dag.api.DAGProtos.PlanVertexType;
+import org.apache.tez.dag.api.DAGProtos.VertexPlan;
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.impl.NullVertexOutputCommitter;
+import org.apache.tez.dag.api.impl.VertexContext;
+import org.apache.tez.dag.api.impl.VertexOutputCommitter;
+import org.apache.tez.dag.api.impl.VertexStatus.State;
+import org.apache.tez.dag.api.records.TaskState;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
+import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
+import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
+import org.apache.tez.dag.app.dag.event.VertexEventType;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.avro.HistoryEventType;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.common.security.JobTokenIdentifier;
+import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
+import org.apache.tez.engine.records.TezDependentTaskCompletionEvent.Status;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestVertexImpl {
+
+  private static final Log LOG = LogFactory.getLog(TestVertexImpl.class);
+
+  private TezDAGID dagId;
+  private ApplicationAttemptId appAttemptId;
+  private DAGPlan dagPlan;
+  private Map<String, VertexImpl> vertices;
+  private Map<TezVertexID, VertexImpl> vertexIdMap;
+  private DrainDispatcher dispatcher;
+  private TaskAttemptListener taskAttemptListener;
+  private Credentials fsTokens;
+  private Token<JobTokenIdentifier> jobToken;
+  private Clock clock = new SystemClock();
+  private TaskHeartbeatHandler thh;
+  private AppContext appContext;
+  private VertexLocationHint vertexLocationHint;
+  private TezConfiguration conf = new TezConfiguration();
+  private Map<String, EdgeProperty> edges;
+
+  private VertexEventDispatcher vertexEventDispatcher;
+
+  private DagEventDispatcher dagEventDispatcher;
+
+  private class CountingVertexOutputCommitter extends
+      VertexOutputCommitter {
+
+    public int initCounter = 0;
+    public int setupCounter = 0;
+    public int commitCounter = 0;
+    public int abortCounter = 0;
+    private boolean throwError;
+    private boolean throwErrorOnAbort;
+    
+    public CountingVertexOutputCommitter(boolean throwError,
+        boolean throwOnAbort) {
+      this.throwError = throwError;
+      this.throwErrorOnAbort = throwOnAbort;
+    }
+
+    public CountingVertexOutputCommitter() {
+      this(false, false);
+    }
+    
+    @Override
+    public void init(VertexContext context) throws IOException {
+      ++initCounter;
+    }
+
+    @Override
+    public void setupVertex() throws IOException {
+      ++setupCounter;
+    }
+
+    @Override
+    public void commitVertex() throws IOException {
+      ++commitCounter;
+      if (throwError) {
+        throw new IOException("I can throwz exceptions in commit");
+      }
+    }
+
+    @Override
+    public void abortVertex(State finalState) throws IOException {
+      ++abortCounter;
+      if (throwErrorOnAbort) {
+        throw new IOException("I can throwz exceptions in abort");
+      }
+    }    
+  }
+  
+  private class DagEventDispatcher implements EventHandler<DAGEvent> {
+    @Override
+    public void handle(DAGEvent event) {
+    }
+  }
+
+  private class HistoryHandler implements EventHandler<DAGHistoryEvent> {
+    @Override
+    public void handle(DAGHistoryEvent event) {
+    }
+  }
+  
+  private class VertexEventDispatcher
+      implements EventHandler<VertexEvent> {
+
+    @Override
+    public void handle(VertexEvent event) {
+      VertexImpl vertex = vertexIdMap.get(event.getVertexId());
+      ((EventHandler<VertexEvent>) vertex).handle(event);
+    }
+  }
+
+
+  private DAGPlan createTestDAGPlan() {
+    LOG.info("Setting up dag plan");
+    DAGPlan dag = DAGPlan.newBuilder()
+        .setName("testverteximpl")
+        .addVertex(
+            VertexPlan.newBuilder()
+            .setName("vertex1")
+            .setType(PlanVertexType.NORMAL)
+            .addTaskLocationHint(
+                PlanTaskLocationHint.newBuilder()
+                .addHost("host1")
+                .addRack("rack1")
+                .build()
+                )
+            .setTaskConfig(
+                PlanTaskConfiguration.newBuilder()
+                .setNumTasks(0)
+                .setVirtualCores(4)
+                .setMemoryMb(1024)
+                .setJavaOpts("")
+                .setTaskModule("x1.y1")
+                .build()
+                )
+            .addOutEdgeId("e1")
+            .build()
+            )
+        .addVertex(
+            VertexPlan.newBuilder()
+            .setName("vertex2")
+            .setType(PlanVertexType.NORMAL)
+            .addTaskLocationHint(
+                PlanTaskLocationHint.newBuilder()
+                .addHost("host2")
+                .addRack("rack2")
+                .build()
+                )
+            .setTaskConfig(
+                PlanTaskConfiguration.newBuilder()
+                .setNumTasks(2)
+                .setVirtualCores(4)
+                .setMemoryMb(1024)
+                .setJavaOpts("")
+                .setTaskModule("x2.y2")
+                .build()
+                )
+            .addOutEdgeId("e2")
+            .build()
+            )
+        .addVertex(
+            VertexPlan.newBuilder()
+            .setName("vertex3")
+            .setType(PlanVertexType.NORMAL)
+            .setProcessorName("x3.y3")
+            .addTaskLocationHint(
+                PlanTaskLocationHint.newBuilder()
+                .addHost("host3")
+                .addRack("rack3")
+                .build()
+                )
+            .setTaskConfig(
+                PlanTaskConfiguration.newBuilder()
+                .setNumTasks(2)
+                .setVirtualCores(4)
+                .setMemoryMb(1024)
+                .setJavaOpts("foo")
+                .setTaskModule("x3.y3")
+                .build()
+                )
+            .addInEdgeId("e1")
+            .addInEdgeId("e2")
+            .addOutEdgeId("e3")
+            .addOutEdgeId("e4")
+            .build()
+            )
+        .addVertex(
+            VertexPlan.newBuilder()
+            .setName("vertex4")
+            .setType(PlanVertexType.NORMAL)
+            .addTaskLocationHint(
+                PlanTaskLocationHint.newBuilder()
+                .addHost("host4")
+                .addRack("rack4")
+                .build()
+                )
+            .setTaskConfig(
+                PlanTaskConfiguration.newBuilder()
+                .setNumTasks(2)
+                .setVirtualCores(4)
+                .setMemoryMb(1024)
+                .setJavaOpts("")
+                .setTaskModule("x4.y4")
+                .build()
+                )
+            .addInEdgeId("e3")
+            .addOutEdgeId("e5")
+            .build()
+            )
+        .addVertex(
+            VertexPlan.newBuilder()
+            .setName("vertex5")
+            .setType(PlanVertexType.NORMAL)
+            .addTaskLocationHint(
+                PlanTaskLocationHint.newBuilder()
+                .addHost("host5")
+                .addRack("rack5")
+                .build()
+                )
+            .setTaskConfig(
+                PlanTaskConfiguration.newBuilder()
+                .setNumTasks(2)
+                .setVirtualCores(4)
+                .setMemoryMb(1024)
+                .setJavaOpts("")
+                .setTaskModule("x5.y5")
+                .build()
+                )
+            .addInEdgeId("e4")
+            .addOutEdgeId("e6")
+            .build()
+            )
+        .addVertex(
+            VertexPlan.newBuilder()
+            .setName("vertex6")
+            .setType(PlanVertexType.NORMAL)
+            .addTaskLocationHint(
+                PlanTaskLocationHint.newBuilder()
+                .addHost("host6")
+                .addRack("rack6")
+                .build()
+                )
+            .setTaskConfig(
+                PlanTaskConfiguration.newBuilder()
+                .setNumTasks(2)
+                .setVirtualCores(4)
+                .setMemoryMb(1024)
+                .setJavaOpts("")
+                .setTaskModule("x6.y6")
+                .build()
+                )
+            .addInEdgeId("e5")
+            .addInEdgeId("e6")
+            .build()
+            )
+        .addEdge(
+            EdgePlan.newBuilder()
+            .setInputClass("i3_v1")
+            .setInputVertexName("vertex1")
+            .setOutputClass("o1")
+            .setOutputVertexName("vertex3")
+            .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE)
+            .setId("e1")
+            .setSourceType(PlanEdgeSourceType.STABLE)
+            .build()
+            )
+        .addEdge(
+            EdgePlan.newBuilder()
+            .setInputClass("i3_v2")
+            .setInputVertexName("vertex2")
+            .setOutputClass("o2")
+            .setOutputVertexName("vertex3")
+            .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE)
+            .setId("e2")
+            .setSourceType(PlanEdgeSourceType.STABLE)
+            .build()
+            )
+        .addEdge(
+            EdgePlan.newBuilder()
+            .setInputClass("i4_v3")
+            .setInputVertexName("vertex3")
+            .setOutputClass("o3_v4")
+            .setOutputVertexName("vertex4")
+            .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE)
+            .setId("e3")
+            .setSourceType(PlanEdgeSourceType.STABLE)
+            .build()
+            )
+        .addEdge(
+            EdgePlan.newBuilder()
+            .setInputClass("i5_v3")
+            .setInputVertexName("vertex3")
+            .setOutputClass("o3_v5")
+            .setOutputVertexName("vertex5")
+            .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE)
+            .setId("e4")
+            .setSourceType(PlanEdgeSourceType.STABLE)
+            .build()
+            )
+        .addEdge(
+            EdgePlan.newBuilder()
+            .setInputClass("i6_v4")
+            .setInputVertexName("vertex4")
+            .setOutputClass("o4")
+            .setOutputVertexName("vertex6")
+            .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE)
+            .setId("e5")
+            .setSourceType(PlanEdgeSourceType.STABLE)
+            .build()
+            )
+        .addEdge(
+            EdgePlan.newBuilder()
+            .setInputClass("i6_v5")
+            .setInputVertexName("vertex5")
+            .setOutputClass("o5")
+            .setOutputVertexName("vertex6")
+            .setConnectionPattern(PlanEdgeConnectionPattern.BIPARTITE)
+            .setId("e6")
+            .setSourceType(PlanEdgeSourceType.STABLE)
+            .build()
+            )
+        .build();
+
+    return dag;
+  }
+
+  private void setupVertices() {
+    int vCnt = dagPlan.getVertexCount();
+    LOG.info("Setting up vertices from dag plan, verticesCnt=" + vCnt);
+    vertices = new HashMap<String, VertexImpl>();
+    vertexIdMap = new HashMap<TezVertexID, VertexImpl>();
+    for (int i = 0; i < vCnt; ++i) {
+      VertexPlan vPlan = dagPlan.getVertex(i);
+      TezVertexID vertexId = new TezVertexID(dagId, i+1);
+      VertexImpl v = new VertexImpl(vertexId, vPlan, vPlan.getName(), conf,
+          dispatcher.getEventHandler(), taskAttemptListener, jobToken, fsTokens,
+          clock, thh, appContext, vertexLocationHint);
+      vertices.put(vPlan.getName(), v);
+      vertexIdMap.put(vertexId, v);
+    }
+  }
+
+  private void parseVertexEdges() {
+    LOG.info("Parsing edges from dag plan, edgeCount="
+        + dagPlan.getEdgeCount());
+    int vCnt = dagPlan.getVertexCount();
+    Map<String, EdgePlan> edgePlans =
+        DagTypeConverters.createEdgePlanMapFromDAGPlan(dagPlan.getEdgeList());
+
+    for (int i = 0; i < vCnt; ++i) {
+      VertexPlan vertexPlan = dagPlan.getVertex(i);
+      Vertex vertex = vertices.get(vertexPlan.getName());
+      Map<Vertex, EdgeProperty> inVertices =
+          new HashMap<Vertex, EdgeProperty>();
+
+      Map<Vertex, EdgeProperty> outVertices =
+          new HashMap<Vertex, EdgeProperty>();
+      
+      for(String inEdgeId : vertexPlan.getInEdgeIdList()){
+        EdgePlan edgePlan = edgePlans.get(inEdgeId);
+        Vertex inVertex = this.vertices.get(edgePlan.getInputVertexName());
+        EdgeProperty edgeProp = this.edges.get(inEdgeId);
+        inVertices.put(inVertex, edgeProp);
+      }
+
+      for(String outEdgeId : vertexPlan.getOutEdgeIdList()){
+        EdgePlan edgePlan = edgePlans.get(outEdgeId);
+        Vertex outVertex = this.vertices.get(edgePlan.getOutputVertexName());
+        EdgeProperty edgeProp = this.edges.get(outEdgeId);
+        outVertices.put(outVertex, edgeProp);
+      }
+      LOG.info("Setting input vertices for vertex " + vertex.getName()
+          + ", inputVerticesCnt=" + inVertices.size());
+      vertex.setInputVertices(inVertices);
+      LOG.info("Setting output vertices for vertex " + vertex.getName()
+          + ", outputVerticesCnt=" + outVertices.size());
+      vertex.setOutputVertices(outVertices);
+    }
+  }
+
+  @Before
+  public void setup() {
+    appAttemptId = BuilderUtils.newApplicationAttemptId(
+        BuilderUtils.newApplicationId(100, 1), 1);
+    dagId = new TezDAGID(appAttemptId.getApplicationId(), 1);
+    dagPlan = createTestDAGPlan();
+    dispatcher = new DrainDispatcher();
+    fsTokens = new Credentials();
+    jobToken = new Token<JobTokenIdentifier>();
+    appContext = mock(AppContext.class);
+    DAG dag = mock(DAG.class);
+    doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
+    doReturn(dag).when(appContext).getDAG();
+    doReturn(dagId).when(appContext).getDAGID();
+    doReturn(dagId).when(dag).getID();
+    setupVertices();
+    edges = DagTypeConverters.createEdgePropertyMapFromDAGPlan(
+        dagPlan.getEdgeList());
+    parseVertexEdges();
+    vertexEventDispatcher = new VertexEventDispatcher();
+    dispatcher.register(VertexEventType.class, vertexEventDispatcher);
+    dagEventDispatcher = new DagEventDispatcher();
+    dispatcher.register(DAGEventType.class, dagEventDispatcher);
+    dispatcher.register(HistoryEventType.class,
+        new HistoryHandler());
+    dispatcher.init(conf);
+    dispatcher.start();
+  }
+
+  @After
+  public void teardown() {
+    dagPlan = null;
+    this.vertices = null;
+    this.edges = null;
+    dispatcher.await();
+    dispatcher.stop();
+  }
+
+  private void initVertex(VertexImpl v) {
+    Assert.assertEquals(VertexState.NEW, v.getState());
+    v.handle(new VertexEvent(v.getVertexId(),
+          VertexEventType.V_INIT));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.INITED, v.getState());
+  }
+
+  private void startVertex(VertexImpl v) {
+    startVertex(v, true);
+  }
+
+  private void killVertex(VertexImpl v, boolean checkKillWait) {
+    v.handle(new VertexEvent(v.getVertexId(), VertexEventType.V_KILL));
+    dispatcher.await();
+    if (checkKillWait) {
+      Assert.assertEquals(VertexState.KILL_WAIT, v.getState());
+    } else {
+      Assert.assertEquals(VertexState.KILLED, v.getState());      
+    }
+  }
+  
+  private void startVertex(VertexImpl v,
+      boolean checkRunningState) {
+    Assert.assertEquals(VertexState.INITED, v.getState());
+    v.handle(new VertexEvent(v.getVertexId(),
+          VertexEventType.V_START));
+    dispatcher.await();
+    if (checkRunningState) {
+      Assert.assertEquals(VertexState.RUNNING, v.getState());
+    }
+  }
+
+  @Test
+  public void testVertexInit() {
+    VertexImpl v = vertices.get("vertex2");
+    initVertex(v);
+
+    VertexImpl v3 = vertices.get("vertex3");
+    initVertex(v3);
+    
+    Assert.assertEquals("x3.y3", v3.getProcessorName());
+    Assert.assertEquals("foo", v3.getJavaOpts());
+    
+    Assert.assertEquals(2, v3.getInputSpecList().size());
+    Assert.assertEquals(2, v3.getInputVerticesCount());
+    Assert.assertEquals(2, v3.getOutputVerticesCount());
+    Assert.assertEquals(2, v3.getOutputVerticesCount());
+
+    Assert.assertTrue("vertex1".equals(v3.getInputSpecList().get(0)
+        .getVertexName())
+        || "vertex2".equals(v3.getInputSpecList().get(0)
+            .getVertexName()));
+    Assert.assertTrue("vertex1".equals(v3.getInputSpecList().get(1)
+        .getVertexName())
+        || "vertex2".equals(v3.getInputSpecList().get(1)
+            .getVertexName()));
+    Assert.assertTrue("i3_v1".equals(v3.getInputSpecList().get(0)
+        .getInputClassName())
+        || "i3_v2".equals(v3.getInputSpecList().get(0)
+            .getInputClassName()));
+    Assert.assertTrue("i3_v1".equals(v3.getInputSpecList().get(1)
+        .getInputClassName())
+        || "i3_v2".equals(v3.getInputSpecList().get(1)
+            .getInputClassName()));
+    
+    Assert.assertTrue("vertex4".equals(v3.getOutputSpecList().get(0)
+        .getVertexName())
+        || "vertex5".equals(v3.getOutputSpecList().get(0)
+            .getVertexName()));
+    Assert.assertTrue("vertex4".equals(v3.getOutputSpecList().get(1)
+        .getVertexName())
+        || "vertex5".equals(v3.getOutputSpecList().get(1)
+            .getVertexName()));
+    Assert.assertTrue("o3_v4".equals(v3.getOutputSpecList().get(0)
+        .getOutputClassName())
+        || "o3_v5".equals(v3.getOutputSpecList().get(0)
+            .getOutputClassName()));
+    Assert.assertTrue("o3_v4".equals(v3.getOutputSpecList().get(1)
+        .getOutputClassName())
+        || "o3_v5".equals(v3.getOutputSpecList().get(1)
+            .getOutputClassName()));
+  }
+
+  @Test
+  public void testVertexStart() {
+    VertexImpl v = vertices.get("vertex2");
+    initVertex(v);
+    startVertex(v);
+  }
+
+  @Test
+  public void testBasicVertexCompletion() {
+    VertexImpl v = vertices.get("vertex2");
+    initVertex(v);
+    startVertex(v);
+
+    TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
+    TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
+
+    v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.RUNNING, v.getState());
+    Assert.assertEquals(1, v.getCompletedTasks());
+
+    v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
+    Assert.assertEquals(2, v.getCompletedTasks());
+  }
+
+  @Test
+  @Ignore // FIXME fix verteximpl for this test to work
+  public void testDuplicateTaskCompletion() {
+    VertexImpl v = vertices.get("vertex2");
+    initVertex(v);
+    startVertex(v);
+
+    TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
+    TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
+
+    v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.RUNNING, v.getState());
+    
+    v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.RUNNING, v.getState());
+
+    v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
+  }
+
+
+  @Test
+  public void testVertexFailure() {
+    VertexImpl v = vertices.get("vertex2");
+    initVertex(v);
+    startVertex(v);
+
+    TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
+
+    v.handle(new VertexEventTaskCompleted(t1, TaskState.FAILED));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.FAILED, v.getState());
+    String diagnostics =
+        StringUtils.join(",", v.getDiagnostics()).toLowerCase();
+    Assert.assertTrue(diagnostics.contains("task failed " + t1.toString()));
+  }
+  
+  @Test
+  public void testVertexWithNoTasks() {
+    // FIXME a vertex with no tasks should not be allowed
+    VertexImpl v = vertices.get("vertex1");
+    initVertex(v);
+    startVertex(v, false);
+    dispatcher.await();
+    Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
+  }
+
+  @Test
+  public void testVertexKill() {
+    VertexImpl v1 = vertices.get("vertex1");
+    killVertex(v1, false);
+    String diagnostics =
+        StringUtils.join(",", v1.getDiagnostics()).toLowerCase();
+    Assert.assertTrue(diagnostics.contains(
+        "vertex received kill in new state"));
+    
+    VertexImpl v2 = vertices.get("vertex2");
+    initVertex(v2);
+    killVertex(v2, false);
+    diagnostics =
+        StringUtils.join(",", v2.getDiagnostics()).toLowerCase();
+    LOG.info("diagnostics v2: " + diagnostics);
+    Assert.assertTrue(diagnostics.contains(
+        "vertex received kill in inited state"));
+
+    VertexImpl v3 = vertices.get("vertex3");
+    initVertex(v3);
+    startVertex(v3);
+    killVertex(v3, true);
+    diagnostics =
+        StringUtils.join(",", v3.getDiagnostics()).toLowerCase();
+    Assert.assertTrue(diagnostics.contains(
+        "vertex received kill while in running state"));
+  }
+
+  @Test
+  public void testKilledTasksHandling() {
+    VertexImpl v = vertices.get("vertex2");
+    initVertex(v);
+    startVertex(v);
+
+    TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
+    TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
+
+    v.handle(new VertexEventTaskCompleted(t1, TaskState.KILLED));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.RUNNING, v.getState());
+
+    v.handle(new VertexEventTaskCompleted(t2, TaskState.KILLED));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.KILLED, v.getState());
+  }
+
+  @Test
+  public void testVertexCommitterInit() {
+    VertexImpl v2 = vertices.get("vertex2");
+    initVertex(v2);
+    Assert.assertTrue(v2.getVertexOutputCommitter()
+        instanceof NullVertexOutputCommitter);
+    
+    VertexImpl v6 = vertices.get("vertex6");
+    initVertex(v6);
+    Assert.assertTrue(v6.getVertexOutputCommitter()
+        instanceof MRVertexOutputCommitter);
+  }
+
+  @Test
+  public void testVertexSchedulerInit() {
+    VertexImpl v2 = vertices.get("vertex2");
+    initVertex(v2);
+    Assert.assertTrue(v2.getVertexScheduler()
+        instanceof ImmediateStartVertexScheduler);
+    
+    VertexImpl v6 = vertices.get("vertex6");
+    initVertex(v6);
+    Assert.assertTrue(v6.getVertexScheduler()
+        instanceof BipartiteSlowStartVertexScheduler);
+  }
+  
+  @Test
+  public void testVertexTaskFailure() {
+    VertexImpl v = vertices.get("vertex2");
+    initVertex(v);
+    CountingVertexOutputCommitter committer =
+        new CountingVertexOutputCommitter();
+    v.setVertexOutputCommitter(committer);
+    startVertex(v);
+
+    TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
+    TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
+
+    v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.RUNNING, v.getState());
+    
+    v.handle(new VertexEventTaskCompleted(t2, TaskState.FAILED));
+    v.handle(new VertexEventTaskCompleted(t2, TaskState.FAILED));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.FAILED, v.getState());
+    Assert.assertEquals(0, committer.commitCounter);
+    Assert.assertEquals(1, committer.abortCounter);
+  }
+
+  @Test
+  public void testSourceVertexStartHandling() {
+    VertexImpl v4 = vertices.get("vertex4");
+    initVertex(v4);
+    VertexImpl v5 = vertices.get("vertex5");
+    initVertex(v5);
+    VertexImpl v6 = vertices.get("vertex6");
+    initVertex(v6);
+    Assert.assertEquals(VertexState.INITED, v6.getState());
+
+    startVertex(v4);
+    startVertex(v5);
+    dispatcher.await();
+    Assert.assertEquals(VertexState.RUNNING, v6.getState());
+    Assert.assertEquals(1, v6.getDistanceFromRoot());
+  }
+
+  @Test
+  public void testCounters() {
+    // FIXME need to test counters at vertex level
+  }
+
+  @Test
+  public void testDiagnostics() {
+    // FIXME need to test diagnostics in various cases
+  }
+  
+  @Test
+  public void testTaskAttemptCompletionEvents() {    
+    // FIXME need to test handling of task attempt events
+  }
+
+  @Test
+  public void testSourceTaskAttemptCompletionEvents() {
+    VertexImpl v4 = vertices.get("vertex4");
+    initVertex(v4);
+    VertexImpl v5 = vertices.get("vertex5");
+    initVertex(v5);
+    VertexImpl v6 = vertices.get("vertex6");
+    initVertex(v6);
+
+    startVertex(v4);
+    startVertex(v5);
+    dispatcher.await();
+    Assert.assertEquals(VertexState.RUNNING, v6.getState());
+
+    TezTaskID t1_v4 = new TezTaskID(v4.getVertexId(), 0);
+    TezTaskID t2_v4 = new TezTaskID(v4.getVertexId(), 1);
+    TezTaskID t1_v5 = new TezTaskID(v5.getVertexId(), 0);
+    TezTaskID t2_v5 = new TezTaskID(v5.getVertexId(), 1);
+
+    TezTaskAttemptID ta1_t1_v4 = new TezTaskAttemptID(t1_v4, 0);
+    TezTaskAttemptID ta2_t1_v4 = new TezTaskAttemptID(t1_v4, 0);
+    TezTaskAttemptID ta1_t2_v4 = new TezTaskAttemptID(t2_v4, 0);
+    TezTaskAttemptID ta1_t1_v5 = new TezTaskAttemptID(t1_v5, 0);
+    TezTaskAttemptID ta1_t2_v5 = new TezTaskAttemptID(t2_v5, 0);
+    TezTaskAttemptID ta2_t2_v5 = new TezTaskAttemptID(t2_v5, 0);
+
+    TezDependentTaskCompletionEvent cEvt1 =
+        new TezDependentTaskCompletionEvent(1, ta1_t1_v4,
+            Status.FAILED, "", 3);
+    TezDependentTaskCompletionEvent cEvt2 =
+        new TezDependentTaskCompletionEvent(2, ta2_t1_v4,
+            Status.SUCCEEDED, "", 4);
+    TezDependentTaskCompletionEvent cEvt3 =
+        new TezDependentTaskCompletionEvent(2, ta1_t2_v4,
+            Status.SUCCEEDED, "", 5);
+    TezDependentTaskCompletionEvent cEvt4 =
+        new TezDependentTaskCompletionEvent(2, ta1_t1_v5,
+            Status.SUCCEEDED, "", 5);
+    TezDependentTaskCompletionEvent cEvt5 =
+        new TezDependentTaskCompletionEvent(1, ta1_t2_v5,
+            Status.FAILED, "", 3);
+    TezDependentTaskCompletionEvent cEvt6 =
+        new TezDependentTaskCompletionEvent(2, ta2_t2_v5,
+            Status.SUCCEEDED, "", 4);
+
+    v4.handle(new VertexEventTaskAttemptCompleted(cEvt1));
+    v4.handle(new VertexEventTaskAttemptCompleted(cEvt2));
+    v4.handle(new VertexEventTaskAttemptCompleted(cEvt3));
+    v5.handle(new VertexEventTaskAttemptCompleted(cEvt4));
+    v5.handle(new VertexEventTaskAttemptCompleted(cEvt5));
+    v5.handle(new VertexEventTaskAttemptCompleted(cEvt6));    
+
+    v4.handle(new VertexEventTaskCompleted(t1_v4, TaskState.SUCCEEDED));
+    v4.handle(new VertexEventTaskCompleted(t2_v4, TaskState.SUCCEEDED));
+    v5.handle(new VertexEventTaskCompleted(t1_v5, TaskState.SUCCEEDED));
+    v5.handle(new VertexEventTaskCompleted(t2_v5, TaskState.SUCCEEDED));
+    dispatcher.await();
+
+    Assert.assertEquals(VertexState.SUCCEEDED, v4.getState());
+    Assert.assertEquals(VertexState.SUCCEEDED, v5.getState());
+
+    Assert.assertEquals(VertexState.RUNNING, v6.getState());
+    Assert.assertEquals(4, v6.successSourceAttemptCompletionEventNoMap.size());
+    Assert.assertEquals(6, v6.getTaskAttemptCompletionEvents(0, 100).length);
+  }
+
+  @Test
+  public void testDAGEventGeneration() {
+    VertexImpl v = vertices.get("vertex2");
+    initVertex(v);
+    startVertex(v);
+
+    TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
+    TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
+
+    v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
+    v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
+
+  }
+
+  @Test
+  public void testTaskReschedule() {
+    // For downstream failures
+    VertexImpl v = vertices.get("vertex2");
+    initVertex(v);
+    CountingVertexOutputCommitter committer =
+        new CountingVertexOutputCommitter();
+    v.setVertexOutputCommitter(committer);
+    
+    startVertex(v);
+
+    TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
+    TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
+
+    v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
+    v.handle(new VertexEventTaskReschedule(t1));
+    // FIXME need to handle dups
+    // v.handle(new VertexEventTaskReschedule(t1));
+    v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.RUNNING, v.getState());    
+    Assert.assertEquals(0, committer.commitCounter);
+    
+    v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.SUCCEEDED, v.getState());    
+    Assert.assertEquals(1, committer.commitCounter);
+    
+  }
+
+  @Test
+  public void testVertexCommit() {
+    VertexImpl v = vertices.get("vertex2");
+    initVertex(v);
+    CountingVertexOutputCommitter committer =
+        new CountingVertexOutputCommitter();
+    v.setVertexOutputCommitter(committer);
+    
+    startVertex(v);
+
+    TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
+    TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
+
+    v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
+    v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
+    v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.SUCCEEDED, v.getState());    
+    Assert.assertEquals(1, committer.commitCounter);
+    
+    v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.SUCCEEDED, v.getState());    
+    Assert.assertEquals(1, committer.commitCounter);
+    Assert.assertEquals(0, committer.abortCounter);
+    Assert.assertEquals(0, committer.initCounter); // already done in init
+    Assert.assertEquals(0, committer.setupCounter); // already done in init
+  }
+  
+  @Test
+  public void testCommitterInitAndSetup() {
+    // FIXME need to add a test for this
+  }
+  
+  @Test
+  public void testTaskAttemptFetchFailureHandling() {
+    // FIXME needs testing
+  }
+
+  @Test
+  public void testBadCommitter() {
+    VertexImpl v = vertices.get("vertex2");
+    initVertex(v);
+    CountingVertexOutputCommitter committer =
+        new CountingVertexOutputCommitter(true, true);
+    v.setVertexOutputCommitter(committer);
+    
+    startVertex(v);
+
+    TezTaskID t1 = new TezTaskID(v.getVertexId(), 0);
+    TezTaskID t2 = new TezTaskID(v.getVertexId(), 1);
+
+    v.handle(new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
+    v.handle(new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.FAILED, v.getState());    
+    Assert.assertEquals(1, committer.commitCounter);
+    
+    // FIXME need to verify whether abort needs to be called if commit fails
+    Assert.assertEquals(0, committer.abortCounter);
+    Assert.assertEquals(0, committer.initCounter); // already done in init
+    Assert.assertEquals(0, committer.setupCounter); // already done in init  
+  }
+  
+}


Mime
View raw message