tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject git commit: TEZ-1345. Add checks to guarantee all init events are written to recovery to consider vertex initialized. (Jeff Zhang via hitesh)
Date Fri, 12 Sep 2014 05:29:55 GMT
Repository: tez
Updated Branches:
  refs/heads/master 16a0f5795 -> fb05aace5


TEZ-1345. Add checks to guarantee all init events are written to recovery to consider vertex
initialized. (Jeff Zhang via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fb05aace
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fb05aace
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fb05aace

Branch: refs/heads/master
Commit: fb05aace5062dc441da80f5ac8d4e322ab2a15ef
Parents: 16a0f57
Author: Hitesh Shah <hitesh@apache.org>
Authored: Thu Sep 11 22:29:29 2014 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Thu Sep 11 22:29:29 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  7 +-
 .../tez/dag/app/dag/impl/VertexManager.java     | 28 ++++----
 .../tez/dag/app/dag/impl/TestVertexManager.java | 76 ++++++++++++++++++++
 .../org/apache/tez/test/TestDAGRecovery.java    | 67 ++++++++++++++++-
 .../apache/tez/test/dag/MultiAttemptDAG.java    | 35 ++++++++-
 6 files changed, 195 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/fb05aace/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 519aaa6..f29d48d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@ ALL CHANGES:
   TEZ-1559. Add system tests for AM recovery.
   TEZ-850. Recovery unit tests.
   TEZ-853. Support counters recovery.
+  TEZ-1345. Add checks to guarantee all init events are written to recovery to consider vertex
initialized.
 
 Release 0.5.1: Unreleased
 

http://git-wip-us.apache.org/repos/asf/tez/blob/fb05aace/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 31240cb..6437e5b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -156,7 +156,6 @@ import org.apache.tez.runtime.api.impl.GroupInputSpec;
 import org.apache.tez.runtime.api.impl.InputSpec;
 import org.apache.tez.runtime.api.impl.OutputSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.HashMultiset;
@@ -2782,10 +2781,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       VertexEventRootInputInitialized liInitEvent = (VertexEventRootInputInitialized) event;
       VertexState state = vertex.getState();
       if (state == VertexState.INITIALIZING) {
-        vertex.vertexManager.onRootVertexInitialized(
+        List<TezEvent> inputInfoEvents =
+            vertex.vertexManager.onRootVertexInitialized(
             liInitEvent.getInputName(),
             vertex.getAdditionalInputs().get(liInitEvent.getInputName())
                 .getIODescriptor(), liInitEvent.getEvents());
+        if (inputInfoEvents != null && !inputInfoEvents.isEmpty()) {
+          ROUTE_EVENT_TRANSITION.transition(vertex, new VertexEventRouteEvent(vertex.vertexId,
inputInfoEvents));
+        }
       }
 
       vertex.numInitializedInputs++;

http://git-wip-us.apache.org/repos/asf/tez/blob/fb05aace/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index 177b946..c2ff660 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import javax.annotation.Nullable;
 
@@ -44,7 +45,6 @@ import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.runtime.api.Event;
@@ -68,10 +68,11 @@ public class VertexManager {
   VertexManagerPluginContextImpl pluginContext;
   UserPayload payload = null;
   AppContext appContext;
-    
+  ConcurrentHashMap<String, List<TezEvent>> cachedRootInputEventMap;
+
   class VertexManagerPluginContextImpl implements VertexManagerPluginContext {
     // TODO Add functionality to allow VertexManagers to send VertexManagerEvents
-    
+
     private EventMetaData rootEventSourceMetadata = new EventMetaData(EventProducerConsumerType.INPUT,
         managedVertex.getName(), "NULL_VERTEX", null);
     private Map<String, EventMetaData> destinationEventMetadataMap = Maps.newHashMap();
@@ -80,7 +81,7 @@ public class VertexManager {
     public Map<String, EdgeProperty> getInputVertexEdgeProperties() {
       // TODO Something similar for Initial Inputs - payload etc visible
       Map<Vertex, Edge> inputs = managedVertex.getInputVertices();
-      Map<String, EdgeProperty> vertexEdgeMap = 
+      Map<String, EdgeProperty> vertexEdgeMap =
                           Maps.newHashMapWithExpectedSize(inputs.size());
       for (Map.Entry<Vertex, Edge> entry : inputs.entrySet()) {
         vertexEdgeMap.put(entry.getKey().getName(), entry.getValue().getEdgeProperty());
@@ -115,7 +116,7 @@ public class VertexManager {
     @Override
     public Set<String> getVertexInputNames() {
       Set<String> inputNames = null;
-      Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>

+      Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>
           inputs = managedVertex.getAdditionalInputs();
       if (inputs != null) {
         inputNames = inputs.keySet();
@@ -128,7 +129,6 @@ public class VertexManager {
       return payload;
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public void addRootInputEvents(final String inputName,
         Collection<InputDataInformationEvent> events) {
@@ -142,8 +142,8 @@ public class VertexManager {
               return tezEvent;
             }
           });
-      appContext.getEventHandler().handle(
-          new VertexEventRouteEvent(managedVertex.getVertexId(), Lists.newArrayList(tezEvents)));
+
+      cachedRootInputEventMap.put(inputName,Lists.newArrayList(tezEvents));
       // Recovery handling is taken care of by the Vertex.
     }
 
@@ -201,7 +201,7 @@ public class VertexManager {
     }
   }
 
-  public VertexManager(VertexManagerPluginDescriptor pluginDesc, 
+  public VertexManager(VertexManagerPluginDescriptor pluginDesc,
       Vertex managedVertex, AppContext appContext) {
     checkNotNull(pluginDesc, "pluginDesc is null");
     checkNotNull(managedVertex, "managedVertex is null");
@@ -209,12 +209,13 @@ public class VertexManager {
     this.pluginDesc = pluginDesc;
     this.managedVertex = managedVertex;
     this.appContext = appContext;
+    this.cachedRootInputEventMap = new ConcurrentHashMap<String, List<TezEvent>>();
   }
-  
+
   public VertexManagerPlugin getPlugin() {
     return plugin;
   }
-  
+
   public void initialize() {
     pluginContext = new VertexManagerPluginContextImpl();
     if (pluginDesc != null) {
@@ -246,7 +247,7 @@ public class VertexManager {
 
   public void onSourceTaskCompleted(TezTaskID tezTaskId) {
     Integer taskId = new Integer(tezTaskId.getId());
-    String vertexName = 
+    String vertexName =
         appContext.getCurrentDAG().getVertex(tezTaskId.getVertexID()).getName();
     plugin.onSourceTaskCompleted(vertexName, taskId);
   }
@@ -255,8 +256,9 @@ public class VertexManager {
     plugin.onVertexManagerEventReceived(vmEvent);
   }
 
-  public void onRootVertexInitialized(String inputName, 
+  public List<TezEvent> onRootVertexInitialized(String inputName,
       InputDescriptor inputDescriptor, List<Event> events) {
     plugin.onRootVertexInitialized(inputName, inputDescriptor, events);
+    return cachedRootInputEventMap.get(inputName);
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/fb05aace/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
new file mode 100644
index 0000000..b3e66bc
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexManager.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.junit.Test;
+
+public class TestVertexManager {
+
+  @Test
+  public void testOnRootVertexInitialized() {
+    Vertex mockVertex = mock(Vertex.class, RETURNS_DEEP_STUBS);
+    AppContext mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
+    doReturn("vertex1").when(mockVertex).getName();
+    when(
+        mockAppContext.getCurrentDAG().getVertex(any(String.class))
+            .getTotalTasks()).thenReturn(1);
+
+    VertexManager vm =
+        new VertexManager(
+            VertexManagerPluginDescriptor.create(RootInputVertexManager.class
+                .getName()), mockVertex, mockAppContext);
+    vm.initialize();
+    InputDescriptor id1 = mock(InputDescriptor.class);
+    List<Event> events1 = new LinkedList<Event>();
+    InputDataInformationEvent diEvent1 =
+        InputDataInformationEvent.createWithSerializedPayload(0, null);
+    events1.add(diEvent1);
+    List<TezEvent> tezEvents1 =
+        vm.onRootVertexInitialized("input1", id1, events1);
+    assertEquals(1, tezEvents1.size());
+    assertEquals(diEvent1, tezEvents1.get(0).getEvent());
+
+    InputDescriptor id2 = mock(InputDescriptor.class);
+    List<Event> events2 = new LinkedList<Event>();
+    InputDataInformationEvent diEvent2 =
+        InputDataInformationEvent.createWithSerializedPayload(0, null);
+    events2.add(diEvent2);
+    List<TezEvent> tezEvents2 =
+        vm.onRootVertexInitialized("input1", id2, events2);
+    assertEquals(tezEvents2.size(), 1);
+    assertEquals(diEvent2, tezEvents2.get(0).getEvent());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/fb05aace/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
index c9acdc2..7676313 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
@@ -24,21 +24,30 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezClient;
+import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.DAGStatus.State;
+import org.apache.tez.dag.app.RecoveryParser;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
 import org.apache.tez.test.dag.MultiAttemptDAG;
 import org.apache.tez.test.dag.MultiAttemptDAG.FailingInputInitializer;
 import org.apache.tez.test.dag.MultiAttemptDAG.NoOpInput;
+import org.apache.tez.test.dag.MultiAttemptDAG.TestRootInputInitializer;
 import org.apache.tez.test.dag.SimpleVTestDAG;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -48,6 +57,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Random;
 
 public class TestDAGRecovery {
@@ -61,6 +71,7 @@ public class TestDAGRecovery {
   private static MiniDFSCluster dfsCluster = null;
   private static TezClient tezSession = null;
   private static FileSystem remoteFs = null;
+  private static TezConfiguration tezConf = null;
 
   @BeforeClass
   public static void beforeClass() throws Exception {
@@ -120,7 +131,7 @@ public class TestDAGRecovery {
         .valueOf(new Random().nextInt(100000))));
     TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
 
-    TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
+    tezConf = new TezConfiguration(miniTezCluster.getConfig());
     tezConf.setInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, 0);
     tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "DEBUG");
     tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
@@ -130,6 +141,7 @@ public class TestDAGRecovery {
     tezConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 500);
     tezConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, " -Xmx256m");
     tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE, "false");
 
     tezSession = TezClient.create("TestDAGRecovery", tezConf);
     tezSession.start();
@@ -165,13 +177,64 @@ public class TestDAGRecovery {
     Assert.assertEquals(finalState, dagStatus.getState());
   }
 
+  private void verifyRecoveryLog() throws IOException{
+    ApplicationId appId = tezSession.getAppMasterApplicationId();
+    Path tezSystemStagingDir = TezCommonUtils.getTezSystemStagingPath(tezConf, appId.toString());
+    Path recoveryDataDir = TezCommonUtils.getRecoveryPath(tezSystemStagingDir, tezConf);
+
+    FileSystem fs = tezSystemStagingDir.getFileSystem(tezConf);
+    for (int i=1; i<=3; ++i) {
+      Path currentAttemptRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir,i);
+      Path recoveryFilePath = new Path(currentAttemptRecoveryDataDir,
+      appId.toString().replace("application", "dag") + "_1" + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
+      List<HistoryEvent> historyEvents = RecoveryParser.parseDAGRecoveryFile(
+          fs.open(recoveryFilePath));
+
+      int inputInfoEventIndex = -1;
+      int vertexInitedEventIndex = -1;
+      for (int j=0;j<historyEvents.size(); ++j) {
+        HistoryEvent historyEvent = historyEvents.get(j);
+        LOG.info("Parsed event from recovery stream"
+            + ", eventType=" + historyEvent.getEventType()
+            + ", event=" + historyEvent);
+        if (historyEvent.getEventType() ==  HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED)
{
+          VertexDataMovementEventsGeneratedEvent dmEvent =
+              (VertexDataMovementEventsGeneratedEvent)historyEvent;
+          // TODO do not need to check whether it is -1 after Tez-1521 is resolved
+          if (dmEvent.getVertexID().getId() == 0 && inputInfoEventIndex == -1) {
+            inputInfoEventIndex = j;
+          }
+        }
+        if (historyEvent.getEventType() == HistoryEventType.VERTEX_INITIALIZED) {
+          VertexInitializedEvent vInitedEvent = (VertexInitializedEvent) historyEvent;
+          if (vInitedEvent.getVertexID().getId() == 0) {
+            vertexInitedEventIndex = j;
+          }
+        }
+      }
+      // v1's init events must be logged before its VertexInitializedEvent (Tez-1345)
+      Assert.assertTrue("can not find VERTEX_DATA_MOVEMENT_EVENTS_GENERATED for v1", inputInfoEventIndex
!= -1);
+      Assert.assertTrue("can not find VERTEX_INITIALIZED for v1", vertexInitedEventIndex
!= -1);
+      Assert.assertTrue("VERTEX_DATA_MOVEMENT_EVENTS_GENERATED is logged before VERTEX_INITIALIZED
for v1",
+          inputInfoEventIndex < vertexInitedEventIndex);
+    }
+  }
+
   @Test(timeout=120000)
   public void testBasicRecovery() throws Exception {
     DAG dag = MultiAttemptDAG.createDAG("TestBasicRecovery", null);
+    // add input to v1 to make sure that there will be init events for v1 (TEZ-1345)
+    DataSourceDescriptor dataSource =
+        DataSourceDescriptor.create(InputDescriptor.create(NoOpInput.class.getName()),
+           InputInitializerDescriptor.create(TestRootInputInitializer.class.getName()), null);
+    dag.getVertex("v1").addDataSource("Input", dataSource);
+
     runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
 
+    verifyRecoveryLog();
+
     // it should fail if submitting same dags in recovery mode (TEZ-1064)
-    try{
+    try {
       DAGClient dagClient = tezSession.submitDAG(dag);
       Assert.fail("Expected DAG submit to fail on duplicate dag name");
     } catch (TezException e) {

http://git-wip-us.apache.org/repos/asf/tez/blob/fb05aace/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
index 7fc9ad7..58b9413 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Lists;
 import com.google.common.primitives.Ints;
 
 import java.nio.ByteBuffer;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -53,12 +54,15 @@ import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.api.InputInitializer;
 import org.apache.tez.runtime.api.InputInitializerContext;
 import org.apache.tez.runtime.api.Writer;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 import org.apache.tez.runtime.api.events.InputInitializerEvent;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.test.TestInput;
 import org.apache.tez.test.TestOutput;
 import org.apache.tez.test.TestProcessor;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -148,8 +152,15 @@ public class MultiAttemptDAG {
     }
 
     @Override
-    public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor,
List<Event> events) {
-      // Do nothing
+    public void onRootVertexInitialized(String inputName,
+        InputDescriptor inputDescriptor, List<Event> events) {
+      List<InputDataInformationEvent> inputInfoEvents = new ArrayList<InputDataInformationEvent>();
+      for (Event event: events) {
+        if (event instanceof InputDataInformationEvent) {
+          inputInfoEvents.add((InputDataInformationEvent)event);
+        }
+      }
+      getContext().addRootInputEvents(inputName, inputInfoEvents);
     }
   }
 
@@ -214,6 +225,26 @@ public class MultiAttemptDAG {
     }
   }
 
+  public static class TestRootInputInitializer extends InputInitializer {
+
+    public TestRootInputInitializer(InputInitializerContext initializerContext) {
+      super(initializerContext);
+    }
+
+    @Override
+    public List<Event> initialize() throws Exception {
+      List<Event> events = new ArrayList<Event>();
+      events.add(InputDataInformationEvent.createWithSerializedPayload(0, ByteBuffer.allocate(0)));
+      return events;
+    }
+
+    @Override
+    public void handleInputInitializerEvent(List<InputInitializerEvent> events)
+        throws Exception {
+      throw new UnsupportedOperationException("Not supported");
+    }
+  }
+
   public static class FailingInputInitializer extends InputInitializer {
 
     public FailingInputInitializer(InputInitializerContext initializerContext) {


Mime
View raw message