tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [04/23] git commit: TEZ-998. InvalidStateTransitonException: Invalid event: V_INIT at INITED. (hitesh)
Date Fri, 20 Jun 2014 22:35:42 GMT
TEZ-998. InvalidStateTransitonException: Invalid event: V_INIT at INITED. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/f0e8a73f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/f0e8a73f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/f0e8a73f

Branch: refs/heads/branch-0.4.1-incubating
Commit: f0e8a73f41a736c1bee50891e2c963c0f75ceeca
Parents: ee2f6aa
Author: Hitesh Shah <hitesh@apache.org>
Authored: Tue Apr 1 14:50:34 2014 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Fri Jun 20 15:34:44 2014 -0700

----------------------------------------------------------------------
 .../api/TezRootInputInitializerContext.java     |   6 +
 .../dag/app/dag/RootInputInitializerRunner.java |  16 ++-
 .../TezRootInputInitializerContextImpl.java     |  10 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  62 ++++++++--
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |   2 +-
 .../common/TestMRInputSplitDistributor.java     |   6 +
 .../org/apache/tez/test/TestDAGRecovery.java    | 116 ++++++++++++++-----
 .../org/apache/tez/test/TestDAGRecovery2.java   | 106 ++++++++++++-----
 .../java/org/apache/tez/test/TestProcessor.java |   5 +
 .../apache/tez/test/dag/MultiAttemptDAG.java    |  66 ++++++++++-
 10 files changed, 324 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f0e8a73f/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializerContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializerContext.java
b/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializerContext.java
index 3dea536..62ced2e 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializerContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TezRootInputInitializerContext.java
@@ -73,4 +73,10 @@ public interface TezRootInputInitializerContext {
    * @return Number of nodes
    */
   int getNumClusterNodes();
+
+  /**
+   * @return DAG Attempt number
+   */
+  int getDAGAttemptNumber();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f0e8a73f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
index 6b65af3..cb2bf82 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerRunner.java
@@ -65,11 +65,13 @@ public class RootInputInitializerRunner {
   private volatile boolean isStopped = false;
   private final UserGroupInformation dagUgi;
   private final int numClusterNodes;
+  private final int dagAttemptNumber;
 
   @SuppressWarnings("rawtypes")
   public RootInputInitializerRunner(String dagName, String vertexName,
       TezVertexID vertexID, EventHandler eventHandler, UserGroupInformation dagUgi,
-      Resource vertexTaskResource, Resource totalResource, int numTasks, int numNodes) {
+      Resource vertexTaskResource, Resource totalResource, int numTasks, int numNodes,
+      int dagAttemptNumber) {
     this.dagName = dagName;
     this.vertexName = vertexName;
     this.vertexID = vertexID;
@@ -82,13 +84,15 @@ public class RootInputInitializerRunner {
     this.executor = MoreExecutors.listeningDecorator(rawExecutor);
     this.dagUgi = dagUgi;
     this.numClusterNodes = numNodes;
+    this.dagAttemptNumber = dagAttemptNumber;
   }
   
   public void runInputInitializers(List<RootInputLeafOutputDescriptor<InputDescriptor>>
inputs) {
     for (RootInputLeafOutputDescriptor<InputDescriptor> input : inputs) {
       ListenableFuture<List<Event>> future = executor
           .submit(new InputInitializerCallable(input, vertexID, dagName,
-              vertexName, dagUgi, numTasks, numClusterNodes, vertexTaskResource, totalResource));
+              vertexName, dagUgi, numTasks, numClusterNodes, vertexTaskResource, totalResource,
+              dagAttemptNumber));
       Futures.addCallback(future, createInputInitializerCallback(input.getEntityName()));
     }
   }
@@ -119,10 +123,12 @@ public class RootInputInitializerRunner {
     private final Resource totalResource;
     private final UserGroupInformation ugi;
     private final int numClusterNodes;
+    private final int dagAttemptNumber;
 
     public InputInitializerCallable(RootInputLeafOutputDescriptor<InputDescriptor>
input,
         TezVertexID vertexID, String dagName, String vertexName, UserGroupInformation ugi,

-        int numTasks, int numClusterNodes, Resource vertexTaskResource, Resource totalResource)
{
+        int numTasks, int numClusterNodes, Resource vertexTaskResource, Resource totalResource,
+        int dagAttemptNumber) {
       this.input = input;
       this.vertexID = vertexID;
       this.dagName = dagName;
@@ -132,6 +138,7 @@ public class RootInputInitializerRunner {
       this.totalResource = totalResource;
       this.ugi = ugi;
       this.numClusterNodes = numClusterNodes;
+      this.dagAttemptNumber = dagAttemptNumber;
     }
 
     @Override
@@ -142,7 +149,8 @@ public class RootInputInitializerRunner {
           TezRootInputInitializer initializer = createInitializer();
           TezRootInputInitializerContext context = new TezRootInputInitializerContextImpl(vertexID,
               dagName, vertexName, input.getEntityName(), input.getDescriptor(), 
-              numTasks, numClusterNodes, vertexTaskResource, totalResource);
+              numTasks, numClusterNodes, vertexTaskResource, totalResource,
+              dagAttemptNumber);
           return initializer.initialize(context);
         }
       });

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f0e8a73f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
index 5ea02b8..b0fb059 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TezRootInputInitializerContextImpl.java
@@ -36,13 +36,15 @@ public class TezRootInputInitializerContextImpl implements
   private final Resource vertexTaskResource;
   private final Resource totalResource;
   private final int numClusterNodes;
+  private final int dagAttemptNumber;
 
   // TODO Add support for counters - merged with the Vertex counters.
   
   public TezRootInputInitializerContextImpl(TezVertexID vertexID,
       String dagName, String vertexName, String inputName,
       InputDescriptor inputDescriptor, int numTasks, int numClusterNodes,
-      Resource vertexTaskResource, Resource totalResource) {
+      Resource vertexTaskResource, Resource totalResource,
+      int dagAttemptNumber) {
     checkNotNull(vertexID, "vertexID is null");
     checkNotNull(dagName, "dagName is null");
     checkNotNull(inputName, "inputName is null");
@@ -57,6 +59,7 @@ public class TezRootInputInitializerContextImpl implements
     this.vertexTaskResource = vertexTaskResource;
     this.totalResource = totalResource;
     this.numClusterNodes = numClusterNodes;
+    this.dagAttemptNumber = dagAttemptNumber;
   }
 
   @Override
@@ -99,4 +102,9 @@ public class TezRootInputInitializerContextImpl implements
     return numClusterNodes;
   }
 
+  @Override
+  public int getDAGAttemptNumber() {
+    return dagAttemptNumber;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f0e8a73f/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 8e7bf8d..c6e8e3d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -215,6 +215,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   private VertexState recoveredState = VertexState.NEW;
   private List<TezEvent> recoveredEvents = new ArrayList<TezEvent>();
+  private boolean vertexAlreadyInitialized = false;
 
   protected static final
     StateMachineFactory<VertexImpl, VertexState, VertexEventType, VertexEvent>
@@ -256,6 +257,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                       VertexState.RECOVERING),
                   VertexEventType.V_SOURCE_VERTEX_RECOVERED,
                   new RecoverTransition())
+          .addTransition
+              (VertexState.INITED,
+                  EnumSet.of(VertexState.INITED, VertexState.ERROR),
+                  VertexEventType.V_INIT,
+                  new IgnoreInitInInitedTransition())
           .addTransition(VertexState.NEW, VertexState.NEW,
               VertexEventType.V_SOURCE_VERTEX_STARTED,
               new SourceVertexStartedTransition())
@@ -1808,7 +1814,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           try {
             vertex.initializeCommitters();
           } catch (Exception e) {
-            LOG.info("Failed to initialize committers", e);
+            LOG.info("Failed to initialize committers"
+                + ", vertex=" + vertex.logIdentifier, e);
             vertex.finished(VertexState.FAILED,
                 VertexTerminationCause.INIT_FAILURE);
             endState = VertexState.FAILED;
@@ -1874,7 +1881,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               && vertex.hasCommitter
               && vertex.summaryCompleteSeen && !vertex.vertexCompleteSeen)
{
             LOG.warn("Cannot recover vertex as all recovery events not"
-                + " found, vertex=" + vertex.logIdentifier
+                + " found, vertexId=" + vertex.logIdentifier
                 + ", hasCommitters=" + vertex.hasCommitter
                 + ", summaryCompletionSeen=" + vertex.summaryCompleteSeen
                 + ", finalCompletionSeen=" + vertex.vertexCompleteSeen);
@@ -1912,7 +1919,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           break;
         default:
           LOG.warn("Invalid recoveredState found when trying to recover"
-              + " vertex, recoveredState=" + vertex.recoveredState);
+              + " vertex"
+              + ", vertex=" + vertex.logIdentifier
+              + ", recoveredState=" + vertex.recoveredState);
           vertex.finished(VertexState.ERROR);
           endState = VertexState.ERROR;
           break;
@@ -1944,6 +1953,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         if (!vertex.recoveredEvents.isEmpty()) {
           throw new RuntimeException("Invalid Vertex state"
               + ", found non-zero recovered events in invalid state"
+              + ", vertex=" + vertex.logIdentifier
               + ", recoveredState=" + endState
               + ", recoveredEvents=" + vertex.recoveredEvents.size());
         }
@@ -1983,6 +1993,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       }
       if (LOG.isDebugEnabled()) {
         LOG.debug("Routing recovered event"
+            + ", vertex=" + logIdentifier
             + ", eventType=" + tezEvent.getEventType()
             + ", sourceInfo=" + sourceMeta
             + ", destinationVertex" + destVertex.getName());
@@ -2025,6 +2036,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           break;
         default:
           LOG.warn("Received invalid SourceVertexRecovered event"
+              + ", vertex=" + vertex.logIdentifier
               + ", sourceVertex=" + sourceRecoveredEvent.getSourceVertexID()
               + ", sourceVertexState=" + sourceRecoveredEvent.getSourceVertexState());
           return vertex.finished(VertexState.ERROR);
@@ -2034,6 +2046,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           vertex.getInputVerticesCount()) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Waiting for source vertices to recover"
+              + ", vertex=" + vertex.logIdentifier
               + ", numRecoveredSourceVertices=" + vertex.numRecoveredSourceVertices
               + ", totalSourceVertices=" + vertex.getInputVerticesCount());
         }
@@ -2066,10 +2079,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           endState = VertexState.NEW;
           break;
         case INITED:
+          vertex.vertexAlreadyInitialized = true;
           try {
             vertex.initializeCommitters();
           } catch (Exception e) {
-            LOG.info("Failed to initialize committers", e);
+            LOG.info("Failed to initialize committers, vertex="
+                + vertex.logIdentifier, e);
             vertex.finished(VertexState.FAILED,
                 VertexTerminationCause.INIT_FAILURE);
             endState = VertexState.FAILED;
@@ -2077,7 +2092,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           }
           if (!vertex.setParallelism(0,
               null, vertex.recoveredSourceEdgeManagers, true)) {
-            LOG.info("Failed to recover edge managers");
+            LOG.info("Failed to recover edge managers, vertex="
+                + vertex.logIdentifier);
             vertex.finished(VertexState.FAILED,
                 VertexTerminationCause.INIT_FAILURE);
             endState = VertexState.FAILED;
@@ -2090,9 +2106,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   new TaskEventRecoverTask(task.getTaskId()));
             }
           }
-          if (vertex.numStartedSourceVertices == vertex.getInputVerticesCount()) {
-            vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
-              VertexEventType.V_START));
+          if (vertex.numInitedSourceVertices != vertex.getInputVerticesCount()) {
+            LOG.info("Vertex already initialized but source vertices have not"
+                + " initialized"
+                + ", vertexId=" + vertex.logIdentifier
+                + ", numInitedSourceVertices=" + vertex.numInitedSourceVertices);
+          } else {
+            if (vertex.numStartedSourceVertices == vertex.getInputVerticesCount()) {
+              vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
+                VertexEventType.V_START));
+            }
           }
           endState = VertexState.INITED;
           break;
@@ -2210,6 +2233,26 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
   }
 
+  public static class IgnoreInitInInitedTransition implements
+      MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
+
+    @Override
+    public VertexState transition(VertexImpl vertex, VertexEvent event) {
+      LOG.info("Received event during INITED state"
+          + ", vertex=" + vertex.logIdentifier
+          + ", eventType=" + event.getType());
+      if (!vertex.vertexAlreadyInitialized) {
+        LOG.error("Vertex not initialized but in INITED state"
+            + ", vertexId=" + vertex.logIdentifier);
+        return vertex.finished(VertexState.ERROR);
+      } else {
+        return VertexState.INITED;
+      }
+    }
+  }
+
+
+
   public static class InitTransition implements
       MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
 
@@ -2311,7 +2354,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       EventHandler eventHandler, int numTasks, int numNodes, 
       Resource vertexTaskResource, Resource totalResource) {
     return new RootInputInitializerRunner(dagName, vertexName, vertexID,
-        eventHandler, dagUgi, vertexTaskResource, totalResource, numTasks, numNodes);
+        eventHandler, dagUgi, vertexTaskResource, totalResource, numTasks, numNodes,
+        appContext.getApplicationAttemptId().getAttemptId());
   }
   
   private VertexState initializeVertexInInitializingState() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f0e8a73f/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index fd26ef0..01a25f6 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -2281,7 +2281,7 @@ public class TestVertexImpl {
         Resource taskResource, Resource totalResource) throws IOException {
       super(dagName, vertexName, vertexID, eventHandler, 
           UserGroupInformation.getCurrentUser(), 
-          taskResource, totalResource, numTasks, 1);
+          taskResource, totalResource, numTasks, 1, 1);
       this.eventHandler = eventHandler;
       this.dispatcher = dispatcher;
       this.vertexID = vertexID;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f0e8a73f/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
index abe45f7..1bdc5c3 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
@@ -191,6 +191,12 @@ public class TestMRInputSplitDistributor {
     public int getNumClusterNodes() {
       return 10;
     }
+
+    @Override
+    public int getDAGAttemptNumber() {
+      return 1;
+    }
+
   }
 
   @Private

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f0e8a73f/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
index 29b6b5e..cd54751 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
@@ -32,12 +32,20 @@ import org.apache.tez.client.TezSession;
 import org.apache.tez.client.TezSessionConfiguration;
 import org.apache.tez.client.TezSessionStatus;
 import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.DAGStatus.State;
 import org.apache.tez.test.dag.MultiAttemptDAG;
+import org.apache.tez.test.dag.MultiAttemptDAG.FailingInputInitializer;
+import org.apache.tez.test.dag.MultiAttemptDAG.NoOpInput;
+import org.apache.tez.test.dag.SimpleVTestDAG;
+import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -50,17 +58,16 @@ public class TestDAGRecovery {
   private static final Log LOG = LogFactory.getLog(TestDAGRecovery.class);
 
   private static Configuration conf = new Configuration();
-  private static MiniTezCluster miniTezCluster;
+  private static MiniTezCluster miniTezCluster = null;
   private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
       + TestDAGRecovery.class.getName() + "-tmpDir";
-  protected static MiniDFSCluster dfsCluster;
-
+  private static MiniDFSCluster dfsCluster = null;
   private static TezSession tezSession = null;
+  private static FileSystem remoteFs = null;
 
   @BeforeClass
-  public static void setup() throws Exception {
+  public static void beforeClass() throws Exception {
     LOG.info("Starting mini clusters");
-    FileSystem remoteFs = null;
     try {
       conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
       dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
@@ -77,30 +84,78 @@ public class TestDAGRecovery {
       miniTezconf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
       miniTezCluster.init(miniTezconf);
       miniTezCluster.start();
+    }
+  }
+
+  @AfterClass
+  public static void afterClass() throws InterruptedException {
+    if (tezSession != null) {
+      try {
+        LOG.info("Stopping Tez Session");
+        tezSession.stop();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+    Thread.sleep(10000);
+    if (miniTezCluster != null) {
+      try {
+        LOG.info("Stopping MiniTezCluster");
+        miniTezCluster.stop();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+    if (dfsCluster != null) {
+      try {
+        LOG.info("Stopping DFSCluster");
+        dfsCluster.shutdown();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+  }
 
-      Path remoteStagingDir = remoteFs.makeQualified(new Path(TEST_ROOT_DIR, String
-          .valueOf(new Random().nextInt(100000))));
-      TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
-
-      TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
-      tezConf.setInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, 0);
-      tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "DEBUG");
-      tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
-          remoteStagingDir.toString());
-      tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
-      tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 4);
-      tezConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 500);
-      tezConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS, " -Xmx256m");
-
-      AMConfiguration amConfig = new AMConfiguration(
-          new HashMap<String, String>(), new HashMap<String, LocalResource>(),
-          tezConf, null);
-      TezSessionConfiguration tezSessionConfig =
-          new TezSessionConfiguration(amConfig, tezConf);
-      tezSession = new TezSession("TestDAGRecovery", tezSessionConfig);
-      tezSession.start();
+  @Before
+  public void setup()  throws Exception {
+    LOG.info("Starting session");
+    Path remoteStagingDir = remoteFs.makeQualified(new Path(TEST_ROOT_DIR, String
+        .valueOf(new Random().nextInt(100000))));
+    TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
+
+    TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
+    tezConf.setInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, 0);
+    tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "DEBUG");
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+        remoteStagingDir.toString());
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
+    tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 4);
+    tezConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 500);
+    tezConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS, " -Xmx256m");
+
+    AMConfiguration amConfig = new AMConfiguration(
+        new HashMap<String, String>(), new HashMap<String, LocalResource>(),
+        tezConf, null);
+    TezSessionConfiguration tezSessionConfig =
+        new TezSessionConfiguration(amConfig, tezConf);
+    tezSession = new TezSession("TestDAGRecovery", tezSessionConfig);
+    tezSession.start();
+  }
+
+  @After
+  public void teardown() throws InterruptedException {
+    if (tezSession != null) {
+      try {
+        LOG.info("Stopping Tez Session");
+        tezSession.stop();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
     }
+    tezSession = null;
+    Thread.sleep(10000);
   }
+
   void runDAGAndVerify(DAG dag, DAGStatus.State finalState) throws Exception {
     TezSessionStatus status = tezSession.getSessionStatus();
     while (status != TezSessionStatus.READY && status != TezSessionStatus.SHUTDOWN)
{
@@ -132,4 +187,13 @@ public class TestDAGRecovery {
 
   }
 
+  @Test(timeout=120000)
+  public void testDelayedInit() throws Exception {
+    DAG dag = SimpleVTestDAG.createDAG("DelayedInitDAG", null);
+    dag.getVertex("v1").addInput("i1",
+        new InputDescriptor(NoOpInput.class.getName()),
+        FailingInputInitializer.class);
+    runDAGAndVerify(dag, State.SUCCEEDED);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f0e8a73f/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
index 2716fdd..d6d78ee 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
@@ -32,6 +32,7 @@ import org.apache.tez.client.TezSession;
 import org.apache.tez.client.TezSessionConfiguration;
 import org.apache.tez.client.TezSessionStatus;
 import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -39,8 +40,13 @@ import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.DAGStatus.State;
 import org.apache.tez.test.dag.MultiAttemptDAG;
+import org.apache.tez.test.dag.MultiAttemptDAG.FailingInputInitializer;
+import org.apache.tez.test.dag.MultiAttemptDAG.NoOpInput;
 import org.apache.tez.test.dag.SimpleVTestDAG;
+import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -53,17 +59,16 @@ public class TestDAGRecovery2 {
   private static final Log LOG = LogFactory.getLog(TestDAGRecovery2.class);
 
   private static Configuration conf = new Configuration();
-  private static MiniTezCluster miniTezCluster;
+  private static MiniTezCluster miniTezCluster = null;
   private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
       + TestDAGRecovery2.class.getName() + "-tmpDir";
-  protected static MiniDFSCluster dfsCluster;
-
+  private static MiniDFSCluster dfsCluster = null;
   private static TezSession tezSession = null;
+  private static FileSystem remoteFs = null;
 
   @BeforeClass
-  public static void setup() throws Exception {
+  public static void beforeClass() throws Exception {
     LOG.info("Starting mini clusters");
-    FileSystem remoteFs = null;
     try {
       conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
       dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
@@ -80,30 +85,77 @@ public class TestDAGRecovery2 {
       miniTezconf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
       miniTezCluster.init(miniTezconf);
       miniTezCluster.start();
+    }
+  }
 
-      Path remoteStagingDir = remoteFs.makeQualified(new Path(TEST_ROOT_DIR, String
-          .valueOf(new Random().nextInt(100000))));
-      TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
-
-      TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
-      tezConf.setInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, 10);
-      tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "DEBUG");
-      tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
-          remoteStagingDir.toString());
-      tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
-      tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 4);
-      tezConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 500);
-      tezConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS, " -Xmx256m");
-
-      AMConfiguration amConfig = new AMConfiguration(
-          new HashMap<String, String>(), new HashMap<String, LocalResource>(),
-          tezConf, null);
-      TezSessionConfiguration tezSessionConfig =
-          new TezSessionConfiguration(amConfig, tezConf);
-      tezSession = new TezSession("TestDAGRecovery2", tezSessionConfig);
-      tezSession.start();
+  @AfterClass
+  public static void afterClass() throws InterruptedException {
+    if (tezSession != null) {
+      try {
+        LOG.info("Stopping Tez Session");
+        tezSession.stop();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+    Thread.sleep(10000);
+    if (miniTezCluster != null) {
+      try {
+        LOG.info("Stopping MiniTezCluster");
+        miniTezCluster.stop();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+    if (dfsCluster != null) {
+      try {
+        LOG.info("Stopping DFSCluster");
+        dfsCluster.shutdown();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
     }
   }
+
+  @Before
+  public void setup()  throws Exception {
+    Path remoteStagingDir = remoteFs.makeQualified(new Path(TEST_ROOT_DIR, String
+        .valueOf(new Random().nextInt(100000))));
+    TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
+
+    TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
+    tezConf.setInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, 10);
+    tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "DEBUG");
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+        remoteStagingDir.toString());
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
+    tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 4);
+    tezConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 500);
+    tezConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS, " -Xmx256m");
+
+    AMConfiguration amConfig = new AMConfiguration(
+        new HashMap<String, String>(), new HashMap<String, LocalResource>(),
+        tezConf, null);
+    TezSessionConfiguration tezSessionConfig =
+        new TezSessionConfiguration(amConfig, tezConf);
+    tezSession = new TezSession("TestDAGRecovery2", tezSessionConfig);
+    tezSession.start();
+  }
+
+  @After
+  public void teardown() throws InterruptedException {
+    if (tezSession != null) {
+      try {
+        LOG.info("Stopping Tez Session");
+        tezSession.stop();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+    tezSession = null;
+    Thread.sleep(10000);
+  }
+
   void runDAGAndVerify(DAG dag, DAGStatus.State finalState) throws Exception {
     TezSessionStatus status = tezSession.getSessionStatus();
     while (status != TezSessionStatus.READY && status != TezSessionStatus.SHUTDOWN)
{
@@ -129,7 +181,7 @@ public class TestDAGRecovery2 {
   }
 
   @Test(timeout=120000)
-  public void testBasicRecovery() throws Exception {
+  public void testFailingCommitter() throws Exception {
     DAG dag = SimpleVTestDAG.createDAG("FailingCommitterDAG", null);
     OutputDescriptor od =
         new OutputDescriptor(MultiAttemptDAG.NoOpOutput.class.getName());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f0e8a73f/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
index 6a25f13..db89e91 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
@@ -201,6 +201,11 @@ public class TestProcessor implements LogicalIOProcessor {
              " sum= " + sum);
     //sum = summation of input values
     for (Map.Entry<String, LogicalInput> entry : inputs.entrySet()) {
+      if (!(entry.getValue() instanceof TestInput)) {
+        LOG.info("Ignoring non TestInput: " + entry.getKey()
+            + " inputClass= " + entry.getValue().getClass().getSimpleName());
+        continue;
+      }
       TestInput input = (TestInput) entry.getValue();
       int inputValue = input.doRead();
       LOG.info("Reading input: " + entry.getKey() + " inputValue= " + inputValue);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f0e8a73f/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
index 80d8588..c288f7e 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
@@ -18,7 +18,6 @@
 
 package org.apache.tez.test.dag;
 
-import com.google.common.primitives.Booleans;
 import com.google.common.primitives.Ints;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -32,24 +31,27 @@ import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.api.client.VertexStatus.State;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.MemoryUpdateCallback;
 import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.OutputCommitterContext;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.TezRootInputInitializer;
+import org.apache.tez.runtime.api.TezRootInputInitializerContext;
 import org.apache.tez.runtime.api.Writer;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.test.TestInput;
 import org.apache.tez.test.TestOutput;
 import org.apache.tez.test.TestProcessor;
-import org.apache.tez.test.dag.MultiAttemptDAG.FailingOutputCommitter.FailingOutputCommitterConfig;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -201,6 +203,62 @@ public class MultiAttemptDAG {
     }
   }
 
+  public static class FailingInputInitializer implements TezRootInputInitializer {
+
+    @Override
+    public List<Event> initialize(TezRootInputInitializerContext inputVertexContext)
throws Exception {
+      try {
+        Thread.sleep(2000l);
+      } catch (InterruptedException e) {
+        // Ignore
+      }
+      if (inputVertexContext.getDAGAttemptNumber() == 1) {
+        LOG.info("Shutting down the AM in 1st attempt");
+        Runtime.getRuntime().halt(-1);
+      }
+      return null;
+    }
+  }
+
+  public static class NoOpInput implements LogicalInput, MemoryUpdateCallback {
+
+    @Override
+    public void setNumPhysicalInputs(int numInputs) {
+
+    }
+
+    @Override
+    public List<Event> initialize(TezInputContext inputContext) throws Exception {
+      inputContext.requestInitialMemory(1l, this);
+      return null;
+    }
+
+    @Override
+    public void start() throws Exception {
+
+    }
+
+    @Override
+    public Reader getReader() throws Exception {
+      return null;
+    }
+
+    @Override
+    public void handleEvents(List<Event> inputEvents) throws Exception {
+
+    }
+
+    @Override
+    public List<Event> close() throws Exception {
+      return null;
+    }
+
+    @Override
+    public void memoryAssigned(long assignedSize) {
+
+    }
+  }
+
   public static class NoOpOutput implements LogicalOutput, MemoryUpdateCallback {
 
     @Override
@@ -240,6 +298,8 @@ public class MultiAttemptDAG {
   }
 
 
+
+
   public static DAG createDAG(String name,
       Configuration conf) throws Exception {
     byte[] payload = null;


Mime
View raw message