tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject [1/4] TEZ-847. Support basic AM recovery. (hitesh)
Date Wed, 05 Mar 2014 23:35:16 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master 18290c848 -> 5b464f27d


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
new file mode 100644
index 0000000..f03863c
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -0,0 +1,569 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License: Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing: software
+ * distributed under the License is distributed on an "AS IS" BASIS:
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND: either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.EdgeManagerDescriptor;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.RuntimeUtils;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestHistoryEventsProtoConversion {
+
+  private static final Log LOG = LogFactory.getLog(
+      TestHistoryEventsProtoConversion.class);
+
+
+  private HistoryEvent testProtoConversion(HistoryEvent event) throws IOException {
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    HistoryEvent deserializedEvent = null;
+    event.toProtoStream(os);
+    os.flush();
+    os.close();
+    deserializedEvent = RuntimeUtils.createClazzInstance(
+        event.getClass().getName());
+    LOG.info("Serialized event to byte array"
+        + ", eventType=" + event.getEventType()
+        + ", bufLen=" + os.toByteArray().length);
+    deserializedEvent.fromProtoStream(
+        new ByteArrayInputStream(os.toByteArray()));
+    return deserializedEvent;
+  }
+
+  private void logEvents(HistoryEvent event,
+      HistoryEvent deserializedEvent) {
+    LOG.info("Initial Event toString: " + event.toString());
+    LOG.info("Deserialized Event toString: " + deserializedEvent.toString());
+  }
+
+  private void testAMLaunchedEvent() throws Exception {
+    AMLaunchedEvent event = new AMLaunchedEvent(
+        ApplicationAttemptId.newInstance(
+            ApplicationId.newInstance(0, 1), 1),
+        100, 100);
+    AMLaunchedEvent deserializedEvent = (AMLaunchedEvent)
+        testProtoConversion(event);
+    Assert.assertEquals(event.getApplicationAttemptId(),
+        deserializedEvent.getApplicationAttemptId());
+    Assert.assertEquals(event.getAppSubmitTime(),
+        deserializedEvent.getAppSubmitTime());
+    Assert.assertEquals(event.getLaunchTime(),
+        deserializedEvent.getLaunchTime());
+    logEvents(event, deserializedEvent);
+  }
+
+  private void testAMStartedEvent() throws Exception {
+    AMStartedEvent event = new AMStartedEvent(
+        ApplicationAttemptId.newInstance(
+            ApplicationId.newInstance(0, 1), 1), 100);
+    AMStartedEvent deserializedEvent = (AMStartedEvent)
+        testProtoConversion(event);
+    Assert.assertEquals(event.getApplicationAttemptId(),
+        deserializedEvent.getApplicationAttemptId());
+    Assert.assertEquals(event.getStartTime(),
+        deserializedEvent.getStartTime());
+    logEvents(event, deserializedEvent);
+  }
+
+  private void testDAGSubmittedEvent() throws Exception {
+    DAGSubmittedEvent event = new DAGSubmittedEvent(TezDAGID.getInstance(
+        ApplicationId.newInstance(0, 1), 1), 1001l,
+        DAGPlan.newBuilder().setName("foo").build(),
+        ApplicationAttemptId.newInstance(
+            ApplicationId.newInstance(0, 1), 1));
+    DAGSubmittedEvent deserializedEvent = (DAGSubmittedEvent)
+        testProtoConversion(event);
+    Assert.assertEquals(event.getApplicationAttemptId(),
+        deserializedEvent.getApplicationAttemptId());
+    Assert.assertEquals(event.getDagID(),
+        deserializedEvent.getDagID());
+    Assert.assertEquals(event.getDAGName(),
+        deserializedEvent.getDAGName());
+    Assert.assertEquals(event.getSubmitTime(),
+        deserializedEvent.getSubmitTime());
+    Assert.assertEquals(event.getDAGPlan(),
+        deserializedEvent.getDAGPlan());
+    logEvents(event, deserializedEvent);
+  }
+
+  private void testDAGInitializedEvent() throws Exception {
+    DAGInitializedEvent event = new DAGInitializedEvent(
+        TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 100334l);
+    DAGInitializedEvent deserializedEvent = (DAGInitializedEvent)
+        testProtoConversion(event);
+    Assert.assertEquals(event.getDagID(),
+        deserializedEvent.getDagID());
+    Assert.assertEquals(event.getInitTime(), deserializedEvent.getInitTime());
+    logEvents(event, deserializedEvent);
+  }
+
+  private void testDAGStartedEvent() throws Exception {
+    DAGStartedEvent event = new DAGStartedEvent(
+        TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 100334l);
+    DAGStartedEvent deserializedEvent = (DAGStartedEvent)
+        testProtoConversion(event);
+    Assert.assertEquals(event.getDagID(),
+        deserializedEvent.getDagID());
+    Assert.assertEquals(event.getStartTime(), deserializedEvent.getStartTime());
+    logEvents(event, deserializedEvent);
+  }
+
+  private void testDAGFinishedEvent() throws Exception {
+    {
+      DAGFinishedEvent event = new DAGFinishedEvent(
+          TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1000l, 20000l,
+          DAGState.FAILED, null, null);
+      DAGFinishedEvent deserializedEvent = (DAGFinishedEvent)
+          testProtoConversion(event);
+      Assert.assertEquals(
+          event.getDagID(),
+          deserializedEvent.getDagID());
+      Assert.assertEquals(event.getState(), deserializedEvent.getState());
+      Assert.assertNotEquals(event.getStartTime(), deserializedEvent.getStartTime());
+      Assert.assertEquals(event.getFinishTime(), deserializedEvent.getFinishTime());
+      Assert.assertEquals(event.getDiagnostics(), deserializedEvent.getDiagnostics());
+      Assert.assertEquals(event.getTezCounters(), deserializedEvent.getTezCounters());
+      logEvents(event, deserializedEvent);
+    }
+    {
+      TezCounters tezCounters = new TezCounters();
+      tezCounters.addGroup("foo", "bar");
+      tezCounters.getGroup("foo").addCounter("c1", "c1", 100);
+      tezCounters.getGroup("foo").findCounter("c1").increment(1);
+      DAGFinishedEvent event = new DAGFinishedEvent(
+          TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1000l, 20000l,
+          DAGState.FAILED, "bad diagnostics", tezCounters);
+      DAGFinishedEvent deserializedEvent = (DAGFinishedEvent)
+          testProtoConversion(event);
+      Assert.assertEquals(
+          event.getDagID(),
+          deserializedEvent.getDagID());
+      Assert.assertEquals(event.getState(), deserializedEvent.getState());
+      Assert.assertNotEquals(event.getStartTime(), deserializedEvent.getStartTime());
+      Assert.assertEquals(event.getFinishTime(), deserializedEvent.getFinishTime());
+      Assert.assertEquals(event.getDiagnostics(), deserializedEvent.getDiagnostics());
+      Assert.assertEquals(event.getTezCounters(), deserializedEvent.getTezCounters());
+      Assert.assertEquals(101,
+          deserializedEvent.getTezCounters().getGroup("foo").findCounter("c1").getValue());
+      logEvents(event, deserializedEvent);
+    }
+  }
+
+  private void testVertexInitializedEvent() throws Exception {
+    VertexInitializedEvent event = new VertexInitializedEvent(
+        TezVertexID.getInstance(
+            TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
+        "vertex1", 1000l, 15000l, 100, "procName", null);
+    VertexInitializedEvent deserializedEvent = (VertexInitializedEvent)
+        testProtoConversion(event);
+    Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
+    Assert.assertEquals(event.getInitRequestedTime(),
+        deserializedEvent.getInitRequestedTime());
+    Assert.assertEquals(event.getInitedTime(),
+        deserializedEvent.getInitedTime());
+    Assert.assertEquals(event.getNumTasks(),
+        deserializedEvent.getNumTasks());
+    Assert.assertEquals(event.getAdditionalInputs(),
+        deserializedEvent.getAdditionalInputs());
+    Assert.assertNull(deserializedEvent.getProcessorName());
+    logEvents(event, deserializedEvent);
+  }
+
+  private void testVertexStartedEvent() throws Exception {
+    VertexStartedEvent event = new VertexStartedEvent(
+        TezVertexID.getInstance(
+            TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
+        145553l, 12334455l);
+    VertexStartedEvent deserializedEvent = (VertexStartedEvent)
+        testProtoConversion(event);
+    Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
+    Assert.assertEquals(event.getStartRequestedTime(),
+        deserializedEvent.getStartRequestedTime());
+    Assert.assertEquals(event.getStartTime(),
+        deserializedEvent.getStartTime());
+    logEvents(event, deserializedEvent);
+  }
+
+  private void testVertexParallelismUpdatedEvent() throws Exception {
+    {
+      VertexParallelismUpdatedEvent event =
+          new VertexParallelismUpdatedEvent(
+              TezVertexID.getInstance(
+                  TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
+              100, null, null);
+      VertexParallelismUpdatedEvent deserializedEvent = (VertexParallelismUpdatedEvent)
+          testProtoConversion(event);
+      Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
+      Assert.assertEquals(event.getNumTasks(), deserializedEvent.getNumTasks());
+      Assert.assertEquals(event.getSourceEdgeManagers(),
+          deserializedEvent.getSourceEdgeManagers());
+      Assert.assertEquals(event.getVertexLocationHint(),
+          deserializedEvent.getVertexLocationHint());
+      logEvents(event, deserializedEvent);
+    }
+    {
+      Map<String,EdgeManagerDescriptor> sourceEdgeManagers
+          = new LinkedHashMap<String, EdgeManagerDescriptor>();
+      sourceEdgeManagers.put("foo", new EdgeManagerDescriptor("bar"));
+      sourceEdgeManagers.put("foo1", new EdgeManagerDescriptor("bar1").setUserPayload(
+          new String("payload").getBytes()));
+      VertexParallelismUpdatedEvent event =
+          new VertexParallelismUpdatedEvent(
+              TezVertexID.getInstance(
+                  TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
+              100, new VertexLocationHint(Arrays.asList(new TaskLocationHint(
+                  new HashSet<String>(Arrays.asList("h1")),
+              new HashSet<String>(Arrays.asList("r1"))))),
+              sourceEdgeManagers);
+
+      VertexParallelismUpdatedEvent deserializedEvent = (VertexParallelismUpdatedEvent)
+          testProtoConversion(event);
+      Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
+      Assert.assertEquals(event.getNumTasks(), deserializedEvent.getNumTasks());
+      Assert.assertEquals(event.getSourceEdgeManagers().size(),
+          deserializedEvent.getSourceEdgeManagers().size());
+      Assert.assertEquals(event.getSourceEdgeManagers().get("foo").getClassName(),
+          deserializedEvent.getSourceEdgeManagers().get("foo").getClassName());
+      Assert.assertArrayEquals(event.getSourceEdgeManagers().get("foo").getUserPayload(),
+          deserializedEvent.getSourceEdgeManagers().get("foo").getUserPayload());
+      Assert.assertEquals(event.getSourceEdgeManagers().get("foo1").getClassName(),
+          deserializedEvent.getSourceEdgeManagers().get("foo1").getClassName());
+      Assert.assertArrayEquals(event.getSourceEdgeManagers().get("foo1").getUserPayload(),
+          deserializedEvent.getSourceEdgeManagers().get("foo1").getUserPayload());
+      Assert.assertEquals(event.getVertexLocationHint(),
+          deserializedEvent.getVertexLocationHint());
+      logEvents(event, deserializedEvent);
+    }
+  }
+
+  private void testVertexFinishedEvent() throws Exception {
+    {
+      VertexFinishedEvent event =
+          new VertexFinishedEvent(TezVertexID.getInstance(
+              TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
+              "vertex1", 1000l, 15000l, 16000l, 20000l, 1344400l, VertexState.ERROR,
+              null, null);
+      VertexFinishedEvent deserializedEvent = (VertexFinishedEvent)
+          testProtoConversion(event);
+      Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
+      Assert.assertEquals(event.getFinishTime(),
+          deserializedEvent.getFinishTime());
+      Assert.assertEquals(event.getState(), deserializedEvent.getState());
+      Assert.assertEquals(event.getDiagnostics(), deserializedEvent.getDiagnostics());
+      Assert.assertEquals(event.getTezCounters(), deserializedEvent.getTezCounters());
+      logEvents(event, deserializedEvent);
+    }
+    {
+      VertexFinishedEvent event =
+          new VertexFinishedEvent(TezVertexID.getInstance(
+              TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
+              "vertex1", 1000l, 15000l, 16000l, 20000l, 1344400l, VertexState.ERROR,
+              "diagnose", new TezCounters());
+      VertexFinishedEvent deserializedEvent = (VertexFinishedEvent)
+          testProtoConversion(event);
+      Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
+      Assert.assertEquals(event.getFinishTime(),
+          deserializedEvent.getFinishTime());
+      Assert.assertEquals(event.getState(), deserializedEvent.getState());
+      Assert.assertEquals(event.getDiagnostics(), deserializedEvent.getDiagnostics());
+      Assert.assertEquals(event.getTezCounters(), deserializedEvent.getTezCounters());
+      logEvents(event, deserializedEvent);
+    }
+  }
+
+  private void testTaskStartedEvent() throws Exception {
+    TaskStartedEvent event = new TaskStartedEvent(
+        TezTaskID.getInstance(TezVertexID.getInstance(
+            TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1),
+        "vertex1", 1000l, 100000l);
+    TaskStartedEvent deserializedEvent = (TaskStartedEvent)
+        testProtoConversion(event);
+    Assert.assertEquals(event.getTaskID(), deserializedEvent.getTaskID());
+    Assert.assertEquals(event.getScheduledTime(),
+        deserializedEvent.getScheduledTime());
+    Assert.assertEquals(event.getStartTime(),
+        deserializedEvent.getStartTime());
+    logEvents(event, deserializedEvent);
+  }
+
+  private void testTaskFinishedEvent() throws Exception {
+    {
+      TaskFinishedEvent event = new TaskFinishedEvent(
+          TezTaskID.getInstance(TezVertexID.getInstance(
+              TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1),
+          "vertex1", 11000l, 1000000l, null, TaskState.FAILED, null);
+      TaskFinishedEvent deserializedEvent = (TaskFinishedEvent)
+          testProtoConversion(event);
+      Assert.assertEquals(event.getTaskID(), deserializedEvent.getTaskID());
+      Assert.assertEquals(event.getFinishTime(),
+          deserializedEvent.getFinishTime());
+      Assert.assertEquals(event.getState(),
+          deserializedEvent.getState());
+      Assert.assertEquals(event.getTezCounters(),
+          deserializedEvent.getTezCounters());
+      Assert.assertEquals(event.getSuccessfulAttemptID(),
+          deserializedEvent.getSuccessfulAttemptID());
+      logEvents(event, deserializedEvent);
+    }
+    {
+      TaskFinishedEvent event = new TaskFinishedEvent(
+          TezTaskID.getInstance(TezVertexID.getInstance(
+              TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1),
+          "vertex1", 11000l, 1000000l,
+          TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+              TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
+          TaskState.FAILED, new TezCounters());
+      TaskFinishedEvent deserializedEvent = (TaskFinishedEvent)
+          testProtoConversion(event);
+      Assert.assertEquals(event.getTaskID(), deserializedEvent.getTaskID());
+      Assert.assertEquals(event.getFinishTime(),
+          deserializedEvent.getFinishTime());
+      Assert.assertEquals(event.getState(),
+          deserializedEvent.getState());
+      Assert.assertEquals(event.getTezCounters(),
+          deserializedEvent.getTezCounters());
+      Assert.assertEquals(event.getSuccessfulAttemptID(),
+          deserializedEvent.getSuccessfulAttemptID());
+      logEvents(event, deserializedEvent);
+    }
+  }
+
+  private void testTaskAttemptStartedEvent() throws Exception {
+    TaskAttemptStartedEvent event = new TaskAttemptStartedEvent(
+        TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+            TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
+        "vertex1", 10009l, ContainerId.newInstance(
+        ApplicationAttemptId.newInstance(
+            ApplicationId.newInstance(0, 1), 1), 1001), NodeId.newInstance(
+        "host1", 19999), "inProgress", "Completed");
+    TaskAttemptStartedEvent deserializedEvent = (TaskAttemptStartedEvent)
+        testProtoConversion(event);
+    Assert.assertEquals(event.getTaskAttemptID(),
+        deserializedEvent.getTaskAttemptID());
+    Assert.assertEquals(event.getContainerId(),
+        deserializedEvent.getContainerId());
+    Assert.assertEquals(event.getNodeId(),
+        deserializedEvent.getNodeId());
+    Assert.assertEquals(event.getStartTime(),
+        deserializedEvent.getStartTime());
+    logEvents(event, deserializedEvent);
+  }
+
+  private void testTaskAttemptFinishedEvent() throws Exception {
+    {
+      TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(
+          TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+              TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
+          "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED,
+          null, null);
+      TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent)
+          testProtoConversion(event);
+      Assert.assertEquals(event.getTaskAttemptID(),
+          deserializedEvent.getTaskAttemptID());
+      Assert.assertEquals(event.getFinishTime(),
+          deserializedEvent.getFinishTime());
+      Assert.assertEquals(event.getDiagnostics(),
+          deserializedEvent.getDiagnostics());
+      Assert.assertEquals(event.getState(),
+          deserializedEvent.getState());
+      Assert.assertEquals(event.getCounters(),
+          deserializedEvent.getCounters());
+      logEvents(event, deserializedEvent);
+    }
+    {
+      TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(
+          TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+              TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
+          "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED,
+          "diagnose", new TezCounters());
+      TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent)
+          testProtoConversion(event);
+      Assert.assertEquals(event.getTaskAttemptID(),
+          deserializedEvent.getTaskAttemptID());
+      Assert.assertEquals(event.getFinishTime(),
+          deserializedEvent.getFinishTime());
+      Assert.assertEquals(event.getDiagnostics(),
+          deserializedEvent.getDiagnostics());
+      Assert.assertEquals(event.getState(),
+          deserializedEvent.getState());
+      Assert.assertEquals(event.getCounters(),
+          deserializedEvent.getCounters());
+      logEvents(event, deserializedEvent);
+    }
+  }
+
+  private void testContainerLaunchedEvent() throws Exception {
+    ContainerLaunchedEvent event = new ContainerLaunchedEvent(
+        ContainerId.newInstance(ApplicationAttemptId.newInstance(
+            ApplicationId.newInstance(0, 1), 1), 1001), 100034566,
+        ApplicationAttemptId.newInstance(
+            ApplicationId.newInstance(0, 1), 1));
+    ContainerLaunchedEvent deserializedEvent = (ContainerLaunchedEvent)
+        testProtoConversion(event);
+    Assert.assertEquals(event.getContainerId(),
+        deserializedEvent.getContainerId());
+    Assert.assertEquals(event.getLaunchTime(),
+        deserializedEvent.getLaunchTime());
+    Assert.assertEquals(event.getApplicationAttemptId(),
+        deserializedEvent.getApplicationAttemptId());
+    logEvents(event, deserializedEvent);
+  }
+
+  private void testVertexDataMovementEventsGeneratedEvent() throws Exception {
+    VertexDataMovementEventsGeneratedEvent event;
+    try {
+      event = new VertexDataMovementEventsGeneratedEvent(
+          TezVertexID.getInstance(
+              TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1), null);
+      Assert.fail("Invalid creation should have errored out");
+    } catch (RuntimeException e) {
+      // Expected
+    }
+    List<TezEvent> events =
+        Arrays.asList(new TezEvent(new DataMovementEvent(1, null), new EventMetaData(
+            EventProducerConsumerType.SYSTEM, "foo", "bar", null)));
+    event = new VertexDataMovementEventsGeneratedEvent(
+            TezVertexID.getInstance(
+                TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1), events);
+    VertexDataMovementEventsGeneratedEvent deserializedEvent =
+        (VertexDataMovementEventsGeneratedEvent) testProtoConversion(event);
+    Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
+    Assert.assertEquals(1,
+        deserializedEvent.getTezEvents().size());
+    Assert.assertEquals(event.getTezEvents().get(0).getEventType(),
+        deserializedEvent.getTezEvents().get(0).getEventType());
+    logEvents(event, deserializedEvent);
+  }
+
+  private void testDAGCommitStartedEvent() throws Exception {
+    DAGCommitStartedEvent event = new DAGCommitStartedEvent(
+        TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1));
+    DAGCommitStartedEvent deserializedEvent =
+        (DAGCommitStartedEvent) testProtoConversion(event);
+    Assert.assertEquals(event.getDagID(), deserializedEvent.getDagID());
+    logEvents(event, deserializedEvent);
+  }
+
+  private void testVertexCommitStartedEvent() throws Exception {
+    VertexCommitStartedEvent event = new VertexCommitStartedEvent(
+        TezVertexID.getInstance(
+            TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1));
+    VertexCommitStartedEvent deserializedEvent =
+        (VertexCommitStartedEvent) testProtoConversion(event);
+    Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
+    logEvents(event, deserializedEvent);
+  }
+
+  @Test
+  public void testDefaultProtoConversion() throws Exception {
+    for (HistoryEventType eventType : HistoryEventType.values()) {
+      switch (eventType) {
+        case AM_LAUNCHED:
+          testAMLaunchedEvent();
+          break;
+        case AM_STARTED:
+          testAMStartedEvent();
+          break;
+        case DAG_SUBMITTED:
+          testDAGSubmittedEvent();
+          break;
+        case DAG_INITIALIZED:
+          testDAGInitializedEvent();
+          break;
+        case DAG_STARTED:
+          testDAGStartedEvent();
+          break;
+        case DAG_FINISHED:
+          testDAGFinishedEvent();
+          break;
+        case VERTEX_INITIALIZED:
+          testVertexInitializedEvent();
+          break;
+        case VERTEX_STARTED:
+          testVertexStartedEvent();
+          break;
+        case VERTEX_PARALLELISM_UPDATED:
+          testVertexParallelismUpdatedEvent();
+          break;
+        case VERTEX_FINISHED:
+          testVertexFinishedEvent();
+          break;
+        case TASK_STARTED:
+          testTaskStartedEvent();
+          break;
+        case TASK_FINISHED:
+          testTaskFinishedEvent();
+          break;
+        case TASK_ATTEMPT_STARTED:
+          testTaskAttemptStartedEvent();
+          break;
+        case TASK_ATTEMPT_FINISHED:
+          testTaskAttemptFinishedEvent();
+          break;
+        case CONTAINER_LAUNCHED:
+          testContainerLaunchedEvent();
+          break;
+        case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+          testVertexDataMovementEventsGeneratedEvent();
+          break;
+        case DAG_COMMIT_STARTED:
+          testDAGCommitStartedEvent();
+          break;
+        case VERTEX_COMMIT_STARTED:
+          testVertexCommitStartedEvent();
+          break;
+        default:
+          throw new Exception("Unhandled Event type in Unit tests: " + eventType);
+        }
+      }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dist/pom.xml
----------------------------------------------------------------------
diff --git a/tez-dist/pom.xml b/tez-dist/pom.xml
index edcfd9f..b315368 100644
--- a/tez-dist/pom.xml
+++ b/tez-dist/pom.xml
@@ -87,11 +87,11 @@
         </configuration>
         <executions>
           <execution>
-            <id>package-tez-full</id>
+            <id>package-tez</id>
             <configuration>
-              <finalName>tez-${project.version}-full</finalName>
+              <finalName>tez-${project.version}</finalName>
               <descriptors>
-                <descriptor>src/main/assembly/tez-dist-full.xml</descriptor>
+                <descriptor>src/main/assembly/tez-dist.xml</descriptor>
               </descriptors>
               <formats>
                 <format>${package.format}</format>
@@ -103,11 +103,11 @@
             </goals>
           </execution>
           <execution>
-            <id>package-tez</id>
+            <id>package-tez-full</id>
             <configuration>
-              <finalName>tez-${project.version}</finalName>
+              <finalName>tez-${project.version}-full</finalName>
               <descriptors>
-                <descriptor>src/main/assembly/tez-dist.xml</descriptor>
+                <descriptor>src/main/assembly/tez-dist-full.xml</descriptor>
               </descriptors>
               <formats>
                 <format>${package.format}</format>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
index 03f28be..e919516 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
@@ -47,10 +47,12 @@ public class MROutputCommitter extends OutputCommitter {
 
   private static final Log LOG = LogFactory.getLog(MROutputCommitter.class);
 
+  private OutputCommitterContext context;
   private org.apache.hadoop.mapreduce.OutputCommitter committer = null;
   private JobContext jobContext = null;
   private volatile boolean initialized = false;
   private JobConf jobConf = null;
+  private boolean newApiCommitter;
 
   @Override
   public void initialize(OutputCommitterContext context) throws IOException {
@@ -66,7 +68,8 @@ public class MROutputCommitter extends OutputCommitter {
     jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
     jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
         context.getDAGAttemptNumber());
-    committer = getOutputCommitter(context);
+    this.context = context;
+    committer = getOutputCommitter(this.context);
     jobContext = getJobContextFromVertexContext(context);
     initialized = true;
   }
@@ -101,7 +104,7 @@ public class MROutputCommitter extends OutputCommitter {
       getOutputCommitter(OutputCommitterContext context) {
 
     org.apache.hadoop.mapreduce.OutputCommitter committer = null;
-    boolean newApiCommitter = false;
+    newApiCommitter = false;
     if (jobConf.getBoolean("mapred.reducer.new-api", false)
         || jobConf.getBoolean("mapred.mapper.new-api", false))  {
       newApiCommitter = true;
@@ -118,8 +121,8 @@ public class MROutputCommitter extends OutputCommitter {
       TaskAttemptID taskAttemptID = new TaskAttemptID(
           Long.toString(context.getApplicationId().getClusterTimestamp()),
           context.getApplicationId().getId(),
-          (jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR,
-              false) ? TaskType.MAP : TaskType.REDUCE),
+          ((jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR, false) ?
+              TaskType.MAP : TaskType.REDUCE)),
           0, context.getDAGAttemptNumber());
 
       TaskAttemptContext taskContext = new TaskAttemptContextImpl(jobConf,
@@ -179,6 +182,29 @@ public class MROutputCommitter extends OutputCommitter {
 
   }
 
+  @Override
+  public boolean isTaskRecoverySupported() {
+    if (!initialized) {
+      throw new RuntimeException("Committer not initialized");
+    }
+    return committer.isRecoverySupported();
+  }
 
+  @Override
+  public void recoverTask(int taskIndex, int attemptId) throws IOException {
+    if (!initialized) {
+      throw new RuntimeException("Committer not initialized");
+    }
+    TaskAttemptID taskAttemptID = new TaskAttemptID(
+        Long.toString(context.getApplicationId().getClusterTimestamp())
+        + String.valueOf(context.getVertexIndex()),
+        context.getApplicationId().getId(),
+        ((jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR, false) ?
+            TaskType.MAP : TaskType.REDUCE)),
+        taskIndex, attemptId);
+    TaskAttemptContext taskContext = new TaskAttemptContextImpl(jobConf,
+        taskAttemptID);
+    committer.recoverTask(taskContext);
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java
index c05ec57..159934a 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventMetaData.java
@@ -135,7 +135,7 @@ public class EventMetaData implements Writable {
     return "{ producerConsumerType=" + producerConsumerType
         + ", taskVertexName=" + taskVertexName
         + ", edgeVertexName=" + edgeVertexName
-        + ", taskAttemptId=" + taskAttemptID
+        + ", taskAttemptId=" + (taskAttemptID == null? "null" : taskAttemptID)
         + " }";
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java
index 1e9265a..1ee7b44 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/InputSpec.java
@@ -50,6 +50,10 @@ public class InputSpec implements Writable {
     return inputDescriptor;
   }
 
+  public void setInputDescriptor(InputDescriptor inputDescriptor) {
+    this.inputDescriptor = inputDescriptor;
+  }
+
   public int getPhysicalEdgeCount() {
     return physicalEdgeCount;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index f4e1957..e0e19c9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -454,7 +454,7 @@ public class ShuffleVertexManager implements VertexManagerPlugin {
     context.scheduleVertexTasks(scheduledTasks);
   }
   
-  void schedulePendingTasks() {    
+  void schedulePendingTasks() {
     int numPendingTasks = pendingTasks.size();
     if (numPendingTasks == 0) {
       return;
@@ -526,7 +526,7 @@ public class ShuffleVertexManager implements VertexManagerPlugin {
     }
     
     this.context = context;
-    
+
     this.slowStartMinSrcCompletionFraction = conf
         .getFloat(
             ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
new file mode 100644
index 0000000..29b6b5e
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.test.dag.MultiAttemptDAG;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Random;
+
+public class TestDAGRecovery {
+
+  private static final Log LOG = LogFactory.getLog(TestDAGRecovery.class);
+
+  private static Configuration conf = new Configuration();
+  private static MiniTezCluster miniTezCluster;
+  private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+      + TestDAGRecovery.class.getName() + "-tmpDir";
+  protected static MiniDFSCluster dfsCluster;
+
+  private static TezSession tezSession = null;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    LOG.info("Starting mini clusters");
+    FileSystem remoteFs = null;
+    try {
+      conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+      dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
+          .format(true).racks(null).build();
+      remoteFs = dfsCluster.getFileSystem();
+    } catch (IOException io) {
+      throw new RuntimeException("problem starting mini dfs cluster", io);
+    }
+    if (miniTezCluster == null) {
+      miniTezCluster = new MiniTezCluster(TestDAGRecovery.class.getName(),
+          1, 1, 1);
+      Configuration miniTezconf = new Configuration(conf);
+      miniTezconf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 4);
+      miniTezconf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+      miniTezCluster.init(miniTezconf);
+      miniTezCluster.start();
+
+      Path remoteStagingDir = remoteFs.makeQualified(new Path(TEST_ROOT_DIR, String
+          .valueOf(new Random().nextInt(100000))));
+      TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
+
+      TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
+      tezConf.setInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, 0);
+      tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "DEBUG");
+      tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+          remoteStagingDir.toString());
+      tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
+      tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 4);
+      tezConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 500);
+      tezConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS, " -Xmx256m");
+
+      AMConfiguration amConfig = new AMConfiguration(
+          new HashMap<String, String>(), new HashMap<String, LocalResource>(),
+          tezConf, null);
+      TezSessionConfiguration tezSessionConfig =
+          new TezSessionConfiguration(amConfig, tezConf);
+      tezSession = new TezSession("TestDAGRecovery", tezSessionConfig);
+      tezSession.start();
+    }
+  }
+  void runDAGAndVerify(DAG dag, DAGStatus.State finalState) throws Exception {
+    TezSessionStatus status = tezSession.getSessionStatus();
+    while (status != TezSessionStatus.READY && status != TezSessionStatus.SHUTDOWN)
{
+      LOG.info("Waiting for session to be ready. Current: " + status);
+      Thread.sleep(100);
+      status = tezSession.getSessionStatus();
+    }
+    if (status == TezSessionStatus.SHUTDOWN) {
+      throw new TezUncheckedException("Unexpected Session shutdown");
+    }
+    DAGClient dagClient = tezSession.submitDAG(dag);
+    DAGStatus dagStatus = dagClient.getDAGStatus(null);
+    while (!dagStatus.isCompleted()) {
+      LOG.info("Waiting for dag to complete. Sleeping for 500ms."
+          + " DAG name: " + dag.getName()
+          + " DAG appId: " + dagClient.getApplicationId()
+          + " Current state: " + dagStatus.getState());
+      Thread.sleep(100);
+      dagStatus = dagClient.getDAGStatus(null);
+    }
+
+    Assert.assertEquals(finalState, dagStatus.getState());
+  }
+
+  @Test(timeout=120000)
+  public void testBasicRecovery() throws Exception {
+    DAG dag = MultiAttemptDAG.createDAG("TestBasicRecovery", null);
+    runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
new file mode 100644
index 0000000..65c8383
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test.dag;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.test.TestInput;
+import org.apache.tez.test.TestOutput;
+import org.apache.tez.test.TestProcessor;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class MultiAttemptDAG {
+
+  private static final Log LOG =
+      LogFactory.getLog(MultiAttemptDAG.class);
+
+  static Resource defaultResource = Resource.newInstance(100, 0);
+  public static String MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS =
+      "tez.multi-attempt-dag.vertex.num-tasks";
+  public static int MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS_DEFAULT = 2;
+
+  public static class FailOnAttemptVertexManagerPlugin
+      implements VertexManagerPlugin {
+    private int numSourceTasks = 0;
+    private AtomicInteger numCompletions = new AtomicInteger();
+    private VertexManagerPluginContext context;
+    private boolean tasksScheduled = false;
+
+    @Override
+    public void initialize(VertexManagerPluginContext context) {
+      this.context = context;
+      for (String input :
+          context.getInputVertexEdgeProperties().keySet()) {
+        LOG.info("Adding sourceTasks for Vertex " + input);
+        numSourceTasks += context.getVertexNumTasks(input);
+        LOG.info("Current numSourceTasks=" + numSourceTasks);
+      }
+    }
+
+    @Override
+    public void onVertexStarted(Map<String, List<Integer>> completions) {
+      if (completions != null) {
+        for (Entry<String, List<Integer>> entry : completions.entrySet()) {
+          LOG.info("Received completion events on vertexStarted"
+              + ", vertex=" + entry.getKey()
+              + ", completions=" + entry.getValue().size());
+          numCompletions.addAndGet(entry.getValue().size());
+        }
+      }
+      maybeScheduleTasks();
+    }
+
+    private synchronized void maybeScheduleTasks() {
+      if (numCompletions.get() >= numSourceTasks
+          && !tasksScheduled) {
+        tasksScheduled = true;
+        String payload = new String(context.getUserPayload());
+        int successAttemptId = Integer.valueOf(payload);
+        LOG.info("Checking whether to crash AM or schedule tasks"
+            + ", successfulAttemptID=" + successAttemptId
+            + ", currentAttempt=" + context.getDAGAttemptNumber());
+        if (successAttemptId > context.getDAGAttemptNumber()) {
+          Runtime.getRuntime().halt(-1);
+        } else if (successAttemptId == context.getDAGAttemptNumber()) {
+          LOG.info("Scheduling tasks for vertex=" + context.getVertexName());
+          int numTasks = context.getVertexNumTasks(context.getVertexName());
+          List<Integer> scheduledTasks = new ArrayList<Integer>(numTasks);
+          for (int i=0; i<numTasks; ++i) {
+            scheduledTasks.add(new Integer(i));
+          }
+          context.scheduleVertexTasks(scheduledTasks);
+        }
+      }
+    }
+
+    @Override
+    public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
+      LOG.info("Received completion events for source task"
+          + ", vertex=" + srcVertexName
+          + ", taskIdx=" + taskId);
+      numCompletions.incrementAndGet();
+      maybeScheduleTasks();
+    }
+
+    @Override
+    public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
+      // Nothing to do
+    }
+
+    @Override
+    public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor,
List<Event> events) {
+      // Do nothing
+    }
+  }
+
+
+  public static DAG createDAG(String name,
+      Configuration conf) throws Exception {
+    byte[] payload = null;
+    int taskCount = MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS_DEFAULT;
+    if (conf != null) {
+      taskCount = conf.getInt(MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS, MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS_DEFAULT);
+      payload = TezUtils.createUserPayloadFromConf(conf);
+    }
+    DAG dag = new DAG(name);
+    Vertex v1 = new Vertex("v1", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    Vertex v2 = new Vertex("v2", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+    Vertex v3 = new Vertex("v3", TestProcessor.getProcDesc(payload), taskCount, defaultResource);
+
+    // Make each vertex manager fail on appropriate attempt
+    v1.setVertexManagerPlugin(new VertexManagerPluginDescriptor(
+        FailOnAttemptVertexManagerPlugin.class.getName())
+        .setUserPayload(new String("1").getBytes()));
+    v2.setVertexManagerPlugin(new VertexManagerPluginDescriptor(
+        FailOnAttemptVertexManagerPlugin.class.getName())
+        .setUserPayload(new String("2").getBytes()));
+    v3.setVertexManagerPlugin(new VertexManagerPluginDescriptor(
+        FailOnAttemptVertexManagerPlugin.class.getName())
+        .setUserPayload(new String("3").getBytes()));
+    dag.addVertex(v1).addVertex(v2).addVertex(v3);
+    dag.addEdge(new Edge(v1, v2,
+        new EdgeProperty(DataMovementType.SCATTER_GATHER,
+            DataSourceType.PERSISTED,
+            SchedulingType.SEQUENTIAL,
+            TestOutput.getOutputDesc(payload),
+            TestInput.getInputDesc(payload))));
+    dag.addEdge(new Edge(v2, v3,
+        new EdgeProperty(DataMovementType.SCATTER_GATHER,
+            DataSourceType.PERSISTED,
+            SchedulingType.SEQUENTIAL,
+            TestOutput.getOutputDesc(payload),
+            TestInput.getInputDesc(payload))));
+    return dag;
+  }
+
+  public static DAG createDAG(Configuration conf) throws Exception {
+    return createDAG("SimpleVTestDAG", conf);
+  }
+
+}


Mime
View raw message