tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-543. Add unit tests for Input initialization. (sseth)
Date Tue, 05 Nov 2013 21:20:22 GMT
Updated Branches:
  refs/heads/master 65e09f847 -> 4eabbf714


TEZ-543. Add unit tests for Input initialization. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/4eabbf71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/4eabbf71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/4eabbf71

Branch: refs/heads/master
Commit: 4eabbf714eb667bcdd7a45c7c50476a7c96a60c7
Parents: 65e09f8
Author: Siddharth Seth <sseth@apache.org>
Authored: Tue Nov 5 13:19:59 2013 -0800
Committer: Siddharth Seth <sseth@apache.org>
Committed: Tue Nov 5 13:19:59 2013 -0800

----------------------------------------------------------------------
 .../dag/app/dag/RootInputInitializerRunner.java |  20 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  22 +-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 298 ++++++++++++++++++-
 3 files changed, 327 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4eabbf71/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
index e7fc7aa..183f7b5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
@@ -36,6 +36,7 @@ import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TezRootInputInitializer;
 import org.apache.tez.runtime.api.TezRootInputInitializerContext;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -75,10 +76,15 @@ public class RootInputInitializerRunner {
       ListenableFuture<List<Event>> future = executor
           .submit(new InputInitializerCallable(input, vertexID, dagName,
               vertexName, numTasks));
-      Futures.addCallback(future, new InputInitializerCallback(input.getEntityName()));
+      Futures.addCallback(future, createInputInitializerCallback(input.getEntityName()));
     }
   }
 
+  @VisibleForTesting
+  protected InputInitializerCallback createInputInitializerCallback(String entityName) {
+    return new InputInitializerCallback(entityName, eventHandler, vertexID);
+  }
+  
   public void shutdown() {
     if (executor != null && !isStopped) {
       // Don't really care about what is running if an error occurs. If no error
@@ -127,12 +133,20 @@ public class RootInputInitializerRunner {
     }
   }
 
-  private class InputInitializerCallback implements FutureCallback<List<Event>>
{
+  @SuppressWarnings("rawtypes")
+  @VisibleForTesting
+  private static class InputInitializerCallback implements
+      FutureCallback<List<Event>> {
 
     private final String inputName;
+    private final EventHandler eventHandler;
+    private final TezVertexID vertexID;
 
-    public InputInitializerCallback(String inputName) {
+    public InputInitializerCallback(String inputName,
+        EventHandler eventHandler, TezVertexID vertexID) {
       this.inputName = inputName;
+      this.eventHandler = eventHandler;
+      this.vertexID = vertexID;
     }
 
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4eabbf71/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 9c121e9..df99bb7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1284,8 +1284,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             + " tasks. Headroom: " + totalResource + " Task Resource: " 
             + taskResource + " waves: " + waves);
 
-        vertex.rootInputInitializer = new RootInputInitializerRunner(vertex
-            .getDAG().getName(), vertex.getName(), vertex.getVertexId(),
+        vertex.rootInputInitializer = vertex.createRootInputInitializerRunner(
+            vertex.getDAG().getName(), vertex.getName(), vertex.getVertexId(),
             vertex.eventHandler, numTasks);
         List<RootInputLeafOutputDescriptor<InputDescriptor>> inputList = Lists
             .newArrayListWithCapacity(vertex.inputsWithInitializers.size());
@@ -1302,6 +1302,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
   } // end of InitTransition
   
+  @VisibleForTesting
+  protected RootInputInitializerRunner createRootInputInitializerRunner(
+      String dagName, String vertexName, TezVertexID vertexID,
+      EventHandler eventHandler, int numTasks) {
+    return new RootInputInitializerRunner(dagName, vertexName, vertexID,
+        eventHandler, numTasks);
+  }
+  
   public static class RootInputInitializedTransition implements
       MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
 
@@ -1973,6 +1981,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   String getJavaOpts() {
     return this.javaOpts;
   }
+  
+  @VisibleForTesting
+  RootInputInitializerRunner getRootInputInitializerRunner() {
+    return this.rootInputInitializer;
+  }
+  
+  @VisibleForTesting
+  VertexLocationHint getVertexLocationHint() {
+    return this.vertexLocationHint;
+  }
 
   // TODO Eventually remove synchronization.
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4eabbf71/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index dadc0e8..4a479e3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.mock;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
@@ -34,14 +35,17 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.dag.api.client.VertexStatus;
 import org.apache.tez.dag.api.committer.NullVertexOutputCommitter;
 import org.apache.tez.dag.api.committer.VertexContext;
@@ -56,6 +60,7 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeSchedulingType;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
 import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
+import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
@@ -63,6 +68,7 @@ import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.EdgeManager;
+import org.apache.tez.dag.app.dag.RootInputInitializerRunner;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
 import org.apache.tez.dag.app.dag.Vertex;
@@ -73,27 +79,38 @@ import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventType;
 import org.apache.tez.dag.app.dag.event.VertexEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
+import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
 import org.apache.tez.dag.app.dag.event.VertexEventTermination;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
+import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.avro.HistoryEventType;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
+import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.mockito.internal.util.collections.Sets;
+
+import com.google.common.collect.Lists;
 
 public class TestVertexImpl {
 
   private static final Log LOG = LogFactory.getLog(TestVertexImpl.class);
 
+  private boolean useCustomInitializer = false;
+  
   private TezDAGID dagId;
   private ApplicationAttemptId appAttemptId;
   private DAGPlan dagPlan;
@@ -231,6 +248,77 @@ public class TestVertexImpl {
     return dag;
   }
 
+  private DAGPlan createDAGPlanWithInputInitializer(String initializerClassName) {
+    LOG.info("Setting up invalid dag plan with input initializer");
+    DAGPlan dag = DAGPlan.newBuilder()
+        .setName("testVertexWithInitializer")
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("vertex1")
+                .setType(PlanVertexType.NORMAL)
+                .addInputs(
+                    RootInputLeafOutputProto.newBuilder()
+                    .setInitializerClassName(initializerClassName)
+                    .setName("input1")
+                    .setEntityDescriptor(
+                        TezEntityDescriptorProto.newBuilder()
+                            .setClassName("InputClazz")
+                            .build()
+                    ).build()
+                )
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                    .setNumTasks(-1)
+                    .setVirtualCores(4)
+                    .setMemoryMb(1024)
+                    .setJavaOpts("")
+                    .setTaskModule("x1.y1")
+                    .build()
+                )
+                .addOutEdgeId("e1")
+            .build()
+        )
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("vertex2")
+                .setType(PlanVertexType.NORMAL)
+                .addInputs(
+                    RootInputLeafOutputProto.newBuilder()
+                    .setInitializerClassName(initializerClassName)
+                    .setName("input2")
+                    .setEntityDescriptor(
+                        TezEntityDescriptorProto.newBuilder()
+                            .setClassName("InputClazz")
+                            .build()
+                    ).build()
+                )
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                    .setNumTasks(-1)
+                    .setVirtualCores(4)
+                    .setMemoryMb(1024)
+                    .setJavaOpts("")
+                    .setTaskModule("x2.y2")
+                    .build()
+                )
+                .addInEdgeId("e1")
+            .build()
+        )
+        .addEdge(
+            EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v2"))
+                .setInputVertexName("vertex1")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+                .setOutputVertexName("vertex2")
+                .setDataMovementType(PlanEdgeDataMovementType.BROADCAST)
+                .setId("e1")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
+            )
+    .build();
+    return dag;
+  }
 
   private DAGPlan createTestDAGPlan() {
     LOG.info("Setting up dag plan");
@@ -468,9 +556,16 @@ public class TestVertexImpl {
       VertexPlan vPlan = dagPlan.getVertex(i);
       String vName = vPlan.getName();
       TezVertexID vertexId = new TezVertexID(dagId, i+1);
-      VertexImpl v = new VertexImpl(vertexId, vPlan, vPlan.getName(), conf,
+      VertexImpl v;
+      if (useCustomInitializer) {
+        v = new VertexImplWithCustomInitializer(vertexId, vPlan, vPlan.getName(), conf,
           dispatcher.getEventHandler(), taskAttemptListener, fsTokens,
-          clock, thh, appContext, vertexLocationHint);
+          clock, thh, appContext, vertexLocationHint, dispatcher);
+      } else {
+        v = new VertexImpl(vertexId, vPlan, vPlan.getName(), conf,
+            dispatcher.getEventHandler(), taskAttemptListener, fsTokens,
+            clock, thh, appContext, vertexLocationHint);
+      }
       vertices.put(vName, v);
       vertexIdMap.put(vertexId, v);
     }
@@ -519,24 +614,27 @@ public class TestVertexImpl {
     }
   }
 
-  @Before
-  public void setup() {
+  public void setupPreDagCreation() {
     conf = new Configuration();
     conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
     appAttemptId = ApplicationAttemptId.newInstance(
         ApplicationId.newInstance(100, 1), 1);
     dagId = new TezDAGID(appAttemptId.getApplicationId(), 1);
-    dagPlan = createTestDAGPlan();
-    invalidDagPlan = createInvalidDAGPlan();
+  }
+
+  public void setupPostDagCreation() {
     dispatcher = new DrainDispatcher();
     fsTokens = new Credentials();
     appContext = mock(AppContext.class);
+    TaskSchedulerEventHandler taskScheduler = mock(TaskSchedulerEventHandler.class);
     DAG dag = mock(DAG.class);
     doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
     doReturn(dag).when(appContext).getCurrentDAG();
     doReturn(DAGPlan.getDefaultInstance()).when(dag).getJobPlan();
     doReturn(dagId).when(appContext).getCurrentDAGID();
     doReturn(dagId).when(dag).getID();
+    doReturn(taskScheduler).when(appContext).getTaskScheduler();
+    doReturn(Resource.newInstance(102400, 60)).when(taskScheduler).getTotalResources();
     setupVertices();
     
     // TODO - this test logic is tightly linked to impl DAGImpl code.
@@ -557,7 +655,15 @@ public class TestVertexImpl {
     dispatcher.register(HistoryEventType.class, new HistoryHandler());
     dispatcher.init(conf);
     dispatcher.start();
-
+  }
+  
+  @Before
+  public void setup() {
+    useCustomInitializer = false;
+    setupPreDagCreation();
+    dagPlan = createTestDAGPlan();
+    invalidDagPlan = createInvalidDAGPlan();
+    setupPostDagCreation();
   }
 
   @After
@@ -568,6 +674,7 @@ public class TestVertexImpl {
     vertexEventDispatcher = null;
     dagEventDispatcher = null;
     dagPlan = null;
+    invalidDagPlan = null;
     this.vertices = null;
     this.edges = null;
     this.vertexIdMap = null;
@@ -674,7 +781,7 @@ public class TestVertexImpl {
     startVertex(v);
   }
 
-  @Test//(timeout = 5000)
+  @Test(timeout = 5000)
   public void testVertexSetParallelism() {
     VertexImpl v3 = vertices.get("vertex3");
     initVertex(v3);
@@ -1241,6 +1348,95 @@ public class TestVertexImpl {
             DAGEventType.INTERNAL_ERROR).intValue());
   }
 
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testVertexWithInitializerFailure() {
+    useCustomInitializer = true;
+    setupPreDagCreation();
+    dagPlan = createDAGPlanWithInputInitializer("TestInputInitializer");
+    setupPostDagCreation();
+
+    VertexImplWithCustomInitializer v1 = (VertexImplWithCustomInitializer) vertices
+        .get("vertex1");
+    dispatcher.getEventHandler().handle(
+        new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
+    RootInputInitializerRunnerControlled runner1 = v1.getRootInputInitializerRunner();
+    runner1.failInputInitialization();
+
+    Assert.assertEquals(VertexState.FAILED, v1.getState());
+    Assert.assertEquals(RootInputVertexManager.class.getName(), v1
+        .getVertexScheduler().getClass().getName());
+    Assert.assertEquals(true, runner1.hasShutDown);
+    
+    VertexImplWithCustomInitializer v2 = (VertexImplWithCustomInitializer) vertices.get("vertex2");
+    dispatcher.getEventHandler().handle(
+        new VertexEvent(v2.getVertexId(), VertexEventType.V_INIT));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.INITIALIZING, v2.getState());
+    RootInputInitializerRunnerControlled runner2 = v2.getRootInputInitializerRunner();
+    runner2.failInputInitialization();
+    
+    Assert.assertEquals(VertexState.FAILED, v2.getState());
+    Assert.assertEquals(RootInputVertexManager.class.getName(), v2
+        .getVertexScheduler().getClass().getName());
+    Assert.assertEquals(true, runner2.hasShutDown);
+  }
+  
+  @SuppressWarnings("unchecked")
+  @Test(timeout = 5000)
+  public void testVertexWithInitializerSuccess() {
+    useCustomInitializer = true;
+    setupPreDagCreation();
+    dagPlan = createDAGPlanWithInputInitializer("TestInputInitializer");
+    setupPostDagCreation();
+
+    VertexImplWithCustomInitializer v1 = (VertexImplWithCustomInitializer) vertices
+        .get("vertex1");
+    dispatcher.getEventHandler().handle(
+        new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
+    RootInputInitializerRunnerControlled runner1 = v1.getRootInputInitializerRunner();
+    List<TaskLocationHint> v1Hints = createTaskLocationHints(5);
+    runner1.completeInputInitialization(5, v1Hints);
+
+    Assert.assertEquals(VertexState.INITED, v1.getState());
+    Assert.assertEquals(5, v1.getTotalTasks());
+    Assert.assertEquals(RootInputVertexManager.class.getName(), v1
+        .getVertexScheduler().getClass().getName());
+    Assert.assertEquals(v1Hints, v1.getVertexLocationHint().getTaskLocationHints());
+    Assert.assertEquals(true, runner1.hasShutDown);
+    
+    VertexImplWithCustomInitializer v2 = (VertexImplWithCustomInitializer) vertices.get("vertex2");
+    dispatcher.getEventHandler().handle(
+        new VertexEvent(v2.getVertexId(), VertexEventType.V_INIT));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.INITIALIZING, v2.getState());
+    RootInputInitializerRunnerControlled runner2 = v2.getRootInputInitializerRunner();
+    List<TaskLocationHint> v2Hints = createTaskLocationHints(10);
+    runner2.completeInputInitialization(10, v2Hints);
+    
+    Assert.assertEquals(VertexState.INITED, v2.getState());
+    Assert.assertEquals(10, v2.getTotalTasks());
+    Assert.assertEquals(RootInputVertexManager.class.getName(), v2
+        .getVertexScheduler().getClass().getName());
+    Assert.assertEquals(v2Hints, v2.getVertexLocationHint().getTaskLocationHints());
+    Assert.assertEquals(true, runner2.hasShutDown);
+  }
+  
+  private List<TaskLocationHint> createTaskLocationHints(int numTasks) {
+    List<TaskLocationHint> locationHints = Lists
+        .newArrayListWithCapacity(numTasks);
+    for (int i = 0; i < numTasks; i++) {
+      TaskLocationHint taskLocationHint = new TaskLocationHint(
+          Sets.newSet("host" + i), null);
+      locationHints.add(taskLocationHint);
+    }
+    return locationHints;
+  }
+
   @Test
   public void testVertexWithNoTasks() {
     TezVertexID vId = null;
@@ -1266,4 +1462,90 @@ public class TestVertexImpl {
     }
   }
 
+  @SuppressWarnings("rawtypes")
+  private static class VertexImplWithCustomInitializer extends VertexImpl {
+    
+    private final DrainDispatcher dispatcher;
+    private RootInputInitializerRunnerControlled rootInputInitializerRunner;
+    
+    public VertexImplWithCustomInitializer(TezVertexID vertexId,
+        VertexPlan vertexPlan, String vertexName, Configuration conf,
+        EventHandler eventHandler, TaskAttemptListener taskAttemptListener,
+        Credentials credentials, Clock clock, TaskHeartbeatHandler thh,
+        AppContext appContext, VertexLocationHint vertexLocationHint, DrainDispatcher dispatcher)
{
+      super(vertexId, vertexPlan, vertexName, conf, eventHandler,
+          taskAttemptListener, credentials, clock, thh, appContext,
+          vertexLocationHint);
+      this.dispatcher = dispatcher;
+    }
+
+    @Override
+    protected RootInputInitializerRunner createRootInputInitializerRunner(
+        String dagName, String vertexName, TezVertexID vertexID,
+        EventHandler eventHandler, int numTasks) {
+      rootInputInitializerRunner = new RootInputInitializerRunnerControlled(dagName, vertexName,
vertexID,
+          eventHandler, numTasks, dispatcher);
+      return rootInputInitializerRunner;
+    }
+    
+    RootInputInitializerRunnerControlled getRootInputInitializerRunner() {
+      return rootInputInitializerRunner;
+    }
+  }
+  
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  private static class RootInputInitializerRunnerControlled extends
+      RootInputInitializerRunner {
+
+    private List<RootInputLeafOutputDescriptor<InputDescriptor>> inputs;
+    private final EventHandler eventHandler;
+    private final DrainDispatcher dispatcher;
+    private final TezVertexID vertexID;
+    private volatile boolean hasShutDown = false;
+
+    public RootInputInitializerRunnerControlled(String dagName,
+        String vertexName, TezVertexID vertexID, EventHandler eventHandler,
+        int numTasks, DrainDispatcher dispatcher) {
+      super(dagName, vertexName, vertexID, eventHandler, numTasks);
+      this.eventHandler = eventHandler;
+      this.dispatcher = dispatcher;
+      this.vertexID = vertexID;
+    }
+
+    @Override
+    public void runInputInitializers(
+        List<RootInputLeafOutputDescriptor<InputDescriptor>> inputs) {
+      this.inputs = inputs;
+    }
+    
+    @Override
+    public void shutdown() {
+      hasShutDown = true;
+    }
+
+    public void failInputInitialization() {
+      super.runInputInitializers(inputs);
+      eventHandler.handle(new VertexEventRootInputFailed(vertexID, inputs
+          .get(0).getEntityName(),
+          new RuntimeException("MockInitializerFailed")));
+      dispatcher.await();
+    }
+
+    public void completeInputInitialization(int targetTasks, List<TaskLocationHint>
locationHints) {
+      List<Event> events = Lists.newArrayListWithCapacity(targetTasks + 1);
+
+      RootInputConfigureVertexTasksEvent configEvent = new RootInputConfigureVertexTasksEvent(
+          targetTasks, locationHints);
+      events.add(configEvent);
+      for (int i = 0; i < targetTasks; i++) {
+        RootInputDataInformationEvent diEvent = new RootInputDataInformationEvent(
+            i, null);
+        events.add(diEvent);
+      }
+      eventHandler.handle(new VertexEventRootInputInitialized(vertexID, inputs
+          .get(0).getEntityName(), events));
+      dispatcher.await();
+    }
+  }
+
 }


Mime
View raw message