tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjf...@apache.org
Subject [1/9] tez git commit: TEZ-2581. Umbrella for Tez Recovery Redesign (zjffdu)
Date Wed, 25 Nov 2015 14:02:21 GMT
Repository: tez
Updated Branches:
  refs/heads/master c4487f966 -> 28f30b0ef


http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index 0d6cbcb..7082ca7 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -50,7 +50,7 @@ import org.apache.tez.dag.history.events.TaskFinishedEvent;
 import org.apache.tez.dag.history.events.TaskStartedEvent;
 import org.apache.tez.dag.history.events.VertexFinishedEvent;
 import org.apache.tez.dag.history.events.VertexInitializedEvent;
-import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
+import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
 import org.apache.tez.dag.history.logging.EntityTypes;
 import org.apache.tez.dag.history.utils.DAGUtils;
@@ -113,15 +113,14 @@ public class HistoryEventTimelineConversion {
       case TASK_ATTEMPT_FINISHED:
         timelineEntity = convertTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) historyEvent);
         break;
-      case VERTEX_PARALLELISM_UPDATED:
-        timelineEntity = convertVertexParallelismUpdatedEvent(
-            (VertexParallelismUpdatedEvent) historyEvent);
+      case VERTEX_CONFIGURE_DONE:
+        timelineEntity = convertVertexReconfigureDoneEvent(
+            (VertexConfigurationDoneEvent) historyEvent);
         break;
       case DAG_RECOVERED:
         timelineEntity = convertDAGRecoveredEvent(
             (DAGRecoveredEvent) historyEvent);
         break;
-      case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
       case VERTEX_COMMIT_STARTED:
       case VERTEX_GROUP_COMMIT_STARTED:
       case VERTEX_GROUP_COMMIT_FINISHED:
@@ -657,8 +656,8 @@ public class HistoryEventTimelineConversion {
     return atsEntity;
   }
 
-  private static TimelineEntity convertVertexParallelismUpdatedEvent(
-      VertexParallelismUpdatedEvent event) {
+  private static TimelineEntity convertVertexReconfigureDoneEvent(
+      VertexConfigurationDoneEvent event) {
     TimelineEntity atsEntity = new TimelineEntity();
     atsEntity.setEntityId(event.getVertexID().toString());
     atsEntity.setEntityType(EntityTypes.TEZ_VERTEX_ID.name());
@@ -669,8 +668,8 @@ public class HistoryEventTimelineConversion {
         event.getVertexID().getDAGId().toString());
 
     TimelineEvent updateEvt = new TimelineEvent();
-    updateEvt.setEventType(HistoryEventType.VERTEX_PARALLELISM_UPDATED.name());
-    updateEvt.setTimestamp(event.getUpdateTime());
+    updateEvt.setEventType(HistoryEventType.VERTEX_CONFIGURE_DONE.name());
+    updateEvt.setTimestamp(event.getReconfigureDoneTime());
 
     Map<String,Object> eventInfo = new HashMap<String, Object>();
     if (event.getSourceEdgeProperties() != null && !event.getSourceEdgeProperties().isEmpty()) {
@@ -683,7 +682,6 @@ public class HistoryEventTimelineConversion {
       eventInfo.put(ATSConstants.UPDATED_EDGE_MANAGERS, updatedEdgeManagers);
     }
     eventInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks());
-    eventInfo.put(ATSConstants.OLD_NUM_TASKS, event.getOldNumTasks());
     updateEvt.setEventInfo(eventInfo);
     atsEntity.addEvent(updateEvt);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index 8e589d2..7792c62 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -70,12 +70,11 @@ import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
 import org.apache.tez.dag.history.events.TaskFinishedEvent;
 import org.apache.tez.dag.history.events.TaskStartedEvent;
 import org.apache.tez.dag.history.events.VertexCommitStartedEvent;
-import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
 import org.apache.tez.dag.history.events.VertexFinishedEvent;
 import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
 import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
 import org.apache.tez.dag.history.events.VertexInitializedEvent;
-import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
+import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
 import org.apache.tez.dag.history.logging.EntityTypes;
 import org.apache.tez.dag.history.utils.DAGUtils;
@@ -158,13 +157,13 @@ public class TestHistoryEventTimelineConversion {
           break;
         case VERTEX_INITIALIZED:
           event = new VertexInitializedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
-              random.nextInt(), "proc", null);
+              random.nextInt(), "proc", null, null);
           break;
         case VERTEX_STARTED:
           event = new VertexStartedEvent(tezVertexID, random.nextInt(), random.nextInt());
           break;
-        case VERTEX_PARALLELISM_UPDATED:
-          event = new VertexParallelismUpdatedEvent(tezVertexID, 1, null, null, null, 1);
+        case VERTEX_CONFIGURE_DONE:
+          event = new VertexConfigurationDoneEvent(tezVertexID, 0L, 1, null, null, null, true);
           break;
         case VERTEX_FINISHED:
           event = new VertexFinishedEvent(tezVertexID, "v1", 1, random.nextInt(), random.nextInt(),
@@ -184,7 +183,8 @@ public class TestHistoryEventTimelineConversion {
           break;
         case TASK_ATTEMPT_FINISHED:
           event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
-              random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST, null, null, null, 0, null, 0);
+              random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST,
+              null, null, null, null, 0, null, 0);
           break;
         case CONTAINER_LAUNCHED:
           event = new ContainerLaunchedEvent(containerId, random.nextInt(),
@@ -193,9 +193,6 @@ public class TestHistoryEventTimelineConversion {
         case CONTAINER_STOPPED:
           event = new ContainerStoppedEvent(containerId, random.nextInt(), -1, applicationAttemptId);
           break;
-        case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
-          event = new VertexRecoverableEventsGeneratedEvent();
-          break;
         case DAG_COMMIT_STARTED:
           event = new DAGCommitStartedEvent();
           break;
@@ -524,7 +521,7 @@ public class TestHistoryEventTimelineConversion {
     events.add(new DataEventDependencyInfo(lastDataEventTime, tezTaskAttemptID));
 
     TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(tezTaskAttemptID, vertexName,
-        startTime, finishTime, state, error, diagnostics, counters, events, creationTime,
+        startTime, finishTime, state, error, diagnostics, counters, events, null, creationTime,
         tezTaskAttemptID, allocationTime);
     TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
     Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId());
@@ -668,7 +665,7 @@ public class TestHistoryEventTimelineConversion {
     long initedTime = random.nextLong();
     int numTasks = random.nextInt();
     VertexInitializedEvent event = new VertexInitializedEvent(tezVertexID, "v1", initRequestedTime,
-        initedTime, numTasks, "proc", null);
+        initedTime, numTasks, "proc", null, null);
 
     TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
     Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(), timelineEntity.getEntityType());
@@ -935,7 +932,7 @@ public class TestHistoryEventTimelineConversion {
 
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
-  public void testConvertVertexParallelismUpdatedEvent() {
+  public void testConvertVertexReconfigreDoneEvent() {
     TezVertexID vId = tezVertexID;
     Map<String, EdgeProperty> edgeMgrs =
         new HashMap<String, EdgeProperty>();
@@ -943,8 +940,8 @@ public class TestHistoryEventTimelineConversion {
     edgeMgrs.put("a", EdgeProperty.create(EdgeManagerPluginDescriptor.create("a.class")
         .setHistoryText("text"), DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
         OutputDescriptor.create("Out"), InputDescriptor.create("In")));
-    VertexParallelismUpdatedEvent event = new VertexParallelismUpdatedEvent(vId, 1, null,
-        edgeMgrs, null, 10);
+    VertexConfigurationDoneEvent event = new VertexConfigurationDoneEvent(vId, 0L, 1, null,
+        edgeMgrs, null, true);
 
     TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
     Assert.assertEquals(ATSConstants.TEZ_VERTEX_ID, timelineEntity.getEntityType());
@@ -959,9 +956,8 @@ public class TestHistoryEventTimelineConversion {
         .contains(tezDAGID.toString()));
 
     TimelineEvent evt = timelineEntity.getEvents().get(0);
-    Assert.assertEquals(HistoryEventType.VERTEX_PARALLELISM_UPDATED.name(), evt.getEventType());
+    Assert.assertEquals(HistoryEventType.VERTEX_CONFIGURE_DONE.name(), evt.getEventType());
     Assert.assertEquals(1, evt.getEventInfo().get(ATSConstants.NUM_TASKS));
-    Assert.assertEquals(10, evt.getEventInfo().get(ATSConstants.OLD_NUM_TASKS));
     Assert.assertNotNull(evt.getEventInfo().get(ATSConstants.UPDATED_EDGE_MANAGERS));
 
     Map<String, Object> updatedEdgeMgrs = (Map<String, Object>)
@@ -976,7 +972,6 @@ public class TestHistoryEventTimelineConversion {
     Assert.assertEquals("a.class", updatedEdgeMgr.get(DAGUtils.EDGE_MANAGER_CLASS_KEY));
 
     Assert.assertEquals(1, timelineEntity.getOtherInfo().get(ATSConstants.NUM_TASKS));
-
   }
 
   @Test(timeout = 5000)

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
index b44b7d4..63e2b86 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
@@ -278,7 +278,7 @@ public class TezEvent implements Writable {
     } else {
       out.writeBoolean(false);
     }
-  }
+  } 
 
   @Override
   public void readFields(DataInput in) throws IOException {

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-tests/src/test/java/org/apache/tez/test/AMShutdownController.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/AMShutdownController.java b/tez-tests/src/test/java/org/apache/tez/test/AMShutdownController.java
new file mode 100644
index 0000000..4baf6de
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/AMShutdownController.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.recovery.RecoveryService;
+
+public abstract class AMShutdownController {
+
+  private List<DAGHistoryEvent> historyEvents = new ArrayList<DAGHistoryEvent>();
+  
+  protected AppContext appContext;
+  protected RecoveryService recoveryService;
+  
+  public AMShutdownController(AppContext appContext, RecoveryService recoveryService) {
+    this.appContext = appContext;
+    this.recoveryService = recoveryService;
+  }
+
+  public void preHandleHistoryEvent(DAGHistoryEvent event) {
+    historyEvents.add(event);
+    if (shouldShutdownPreEvent(event, historyEvents)) {
+      System.exit(1);
+    }
+  }
+
+  public void postHandleHistoryEvent(DAGHistoryEvent event) {
+    if (shouldShutdownPostEvent(event, historyEvents)) {
+      System.exit(1);
+    }
+  }
+
+  protected abstract boolean shouldShutdownPreEvent(DAGHistoryEvent curEvent,
+      List<DAGHistoryEvent> historyEvents);
+
+  protected abstract boolean shouldShutdownPostEvent(DAGHistoryEvent curEvent,
+      List<DAGHistoryEvent> historyEvents);
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java b/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java
new file mode 100644
index 0000000..cec8fbd
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java
@@ -0,0 +1,386 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.api.TezReflectionException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.SummaryEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.history.recovery.RecoveryService;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.test.RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Add hook before/after processing RecoveryEvent & SummaryEvent
+ *
+ */
+public class RecoveryServiceWithEventHandlingHook extends RecoveryService {
+
+  public static final String AM_RECOVERY_SERVICE_HOOK_CLASS = "tez.test.am.recovery_service.hook";
+  private static final Logger LOG = LoggerFactory.getLogger(RecoveryServiceWithEventHandlingHook.class);
+  private RecoveryServiceHook hook;
+  private boolean shutdownInvoked = false;
+  public RecoveryServiceWithEventHandlingHook(AppContext appContext) {
+    super(appContext);
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    String clazz = conf.get(AM_RECOVERY_SERVICE_HOOK_CLASS);
+    Preconditions.checkArgument(clazz != null, "RecoveryServiceHook class is not specified");
+    this.hook = ReflectionUtils.createClazzInstance(clazz, 
+        new Class[]{RecoveryServiceWithEventHandlingHook.class, AppContext.class},
+        new Object[]{this, super.appContext});
+  }
+
+  @Override
+  protected void handleRecoveryEvent(DAGHistoryEvent event) throws IOException {
+    hook.preHandleRecoveryEvent(event);
+    if (shutdownInvoked) {
+      return;
+    }
+    super.handleRecoveryEvent(event);
+    hook.postHandleRecoveryEvent(event);
+  }
+
+  @Override
+  protected void handleSummaryEvent(TezDAGID dagID, HistoryEventType eventType,
+      SummaryEvent summaryEvent) throws IOException {
+    hook.preHandleSummaryEvent(eventType, summaryEvent);
+    if (shutdownInvoked) {
+      return;
+    }
+    super.handleSummaryEvent(dagID, eventType, summaryEvent);
+    hook.postHandleSummaryEvent(eventType, summaryEvent);
+  }
+
+  private void shutdown() {
+    // start a new thread to shutdown AM otherwise will cause dead lock
+    // (JVM exit will DAGAppMasterShutdownHook called and RecoveryService's stop will be called
+    // which will drain all the events)
+    Thread shutdownThread = new Thread("AMShutdown Thread") {
+      @Override
+      public void run() {
+        LOG.info("Try to kill AM");
+        System.exit(1);
+      }
+    };
+    // stop process recovery events
+    super.setStopped(true);
+    shutdownInvoked = true;
+    shutdownThread.start();
+  }
+
+  /**
+   * Abstract class to allow do something before/after processing recovery events
+   *
+   */
+  public static abstract class RecoveryServiceHook {
+
+    protected RecoveryServiceWithEventHandlingHook recoveryService;
+    protected AppContext appContext;
+
+    public RecoveryServiceHook(RecoveryServiceWithEventHandlingHook recoveryService, AppContext appContext) {
+      this.recoveryService = recoveryService;
+      this.appContext = appContext;
+    }
+
+    public abstract void preHandleRecoveryEvent(DAGHistoryEvent event) throws IOException;
+
+    public abstract void postHandleRecoveryEvent(DAGHistoryEvent event) throws IOException;
+
+    public abstract void preHandleSummaryEvent(HistoryEventType eventType,
+        SummaryEvent summaryEvent) throws IOException;
+
+    public abstract void postHandleSummaryEvent(HistoryEventType eventType,
+        SummaryEvent summaryEvent) throws IOException;
+
+  }
+
+  /**
+   * Shutdown AM before/after a specified recovery event is processed.
+   * Only do it in the first AM attempt
+   *
+   */
+  public static class SimpleRecoveryEventHook extends RecoveryServiceHook {
+
+    public static final String SIMPLE_SHUTDOWN_CONDITION = "tez.test.recovery.simple_shutdown_condition";
+    private SimpleShutdownCondition shutdownCondition;
+
+    public SimpleRecoveryEventHook(
+        RecoveryServiceWithEventHandlingHook recoveryService, AppContext appContext) {
+      super(recoveryService, appContext);
+      this.shutdownCondition = new SimpleShutdownCondition();
+      try {
+        Preconditions.checkArgument(recoveryService.getConfig().get(SIMPLE_SHUTDOWN_CONDITION) != null,
+            SIMPLE_SHUTDOWN_CONDITION + " is not set in TezConfiguration");
+        this.shutdownCondition.deserialize(recoveryService.getConfig().get(SIMPLE_SHUTDOWN_CONDITION));
+      } catch (IOException e) {
+        throw new TezUncheckedException("Can not initialize SimpleShutdownCondition", e);
+      }
+    }
+
+    @Override
+    public void preHandleRecoveryEvent(DAGHistoryEvent event)
+        throws IOException {
+      if (shutdownCondition.timing.equals(TIMING.PRE)
+          && appContext.getApplicationAttemptId().getAttemptId() == 1
+          && shouldShutdown(event)) {
+        recoveryService.shutdown();
+      }
+    }
+
+    @Override
+    public void postHandleRecoveryEvent(DAGHistoryEvent event)
+        throws IOException {
+      if (shutdownCondition.timing.equals(TIMING.POST)
+          && appContext.getApplicationAttemptId().getAttemptId() == 1
+          && shouldShutdown(event)) {
+        recoveryService.shutdown();
+      }
+    }
+
+    private boolean shouldShutdown(DAGHistoryEvent event) {
+      // only check whether to shutdown when it is the first AM attempt
+      if (appContext.getApplicationAttemptId().getAttemptId() >= 2) {
+        return false;
+      }
+      return shutdownCondition.match(event.getHistoryEvent());
+    }
+ 
+    @Override
+    public void preHandleSummaryEvent(HistoryEventType eventType,
+        SummaryEvent summaryEvent) throws IOException {
+    }
+
+    @Override
+    public void postHandleSummaryEvent(HistoryEventType eventType,
+        SummaryEvent summaryEvent) throws IOException {
+    }
+
+  }
+
+  /**
+   * 
+   * Shutdown AM based on one recovery event if it is matched.
+   * This would be serialized as property of TezConfiguration and deserialized at runtime.
+   */
+  public static class SimpleShutdownCondition {
+
+    public static enum TIMING {
+      PRE, // before the event
+      POST, // after the event
+    }
+
+    private TIMING timing;
+    private HistoryEvent event;
+
+    public SimpleShutdownCondition(TIMING timing, HistoryEvent event) {
+      this.timing = timing;
+      this.event = event;
+    }
+
+    public SimpleShutdownCondition() {
+    }
+
+    private String encodeHistoryEvent(HistoryEvent event) throws IOException {
+      ByteArrayOutputStream out = new ByteArrayOutputStream();
+      event.toProtoStream(out);
+      return event.getClass().getName() + ","
+          + Base64.encodeBase64String(out.toByteArray());
+    }
+
+    private HistoryEvent decodeHistoryEvent(String eventClass, String base64)
+        throws IOException {
+      ByteArrayInputStream in = new ByteArrayInputStream(
+          Base64.decodeBase64(base64));
+      try {
+        HistoryEvent event = ReflectionUtils.createClazzInstance(eventClass);
+        event.fromProtoStream(in);
+        return event;
+      } catch (TezReflectionException e) {
+        throw new IOException(e);
+      }
+    }
+
+    public String serialize() throws IOException {
+      StringBuilder builder = new StringBuilder();
+      builder.append(timing.name() + ",");
+      builder.append(encodeHistoryEvent(event));
+      return builder.toString();
+    }
+
+    public SimpleShutdownCondition deserialize(String str) throws IOException {
+      String[] tokens = str.split(",");
+      timing = TIMING.valueOf(tokens[0]);
+      this.event = decodeHistoryEvent(tokens[1], tokens[2]);
+      return this;
+    }
+
+    public HistoryEvent getEvent() {
+      return event;
+    }
+
+    public TIMING getTiming() {
+      return timing;
+    }
+
+    public boolean match(HistoryEvent incomingEvent) {
+      switch (event.getEventType()) {
+      case DAG_SUBMITTED:
+        if (incomingEvent.getEventType() == HistoryEventType.DAG_SUBMITTED) {
+          // only compare eventType
+          return true;
+        }
+        break;
+
+      case DAG_INITIALIZED:
+        if (incomingEvent.getEventType() == HistoryEventType.DAG_INITIALIZED) {
+          // only compare eventType
+          return true;
+        }
+        break;
+
+      case DAG_STARTED:
+        if (incomingEvent.getEventType() == HistoryEventType.DAG_STARTED) {
+          // only compare eventType
+          return true;
+        }
+        break;
+
+      case DAG_FINISHED:
+        if (incomingEvent.getEventType() == HistoryEventType.DAG_FINISHED) {
+          // only compare eventType
+          return true;
+        }
+        break;
+
+      case VERTEX_INITIALIZED:
+        if (incomingEvent.getEventType() == HistoryEventType.VERTEX_INITIALIZED) {
+          VertexInitializedEvent otherEvent = (VertexInitializedEvent) incomingEvent;
+          VertexInitializedEvent conditionEvent = (VertexInitializedEvent) event;
+          // compare vertexId;
+          return otherEvent.getVertexID().getId() == conditionEvent.getVertexID().getId();
+        }
+        break;
+
+      case VERTEX_STARTED:
+        if (incomingEvent.getEventType() == HistoryEventType.VERTEX_STARTED) {
+          VertexStartedEvent otherEvent = (VertexStartedEvent) incomingEvent;
+          VertexStartedEvent conditionEvent = (VertexStartedEvent) event;
+          // compare vertexId
+          return otherEvent.getVertexID().getId() == conditionEvent.getVertexID().getId();
+        }
+        break;
+
+      case VERTEX_FINISHED:
+        if (incomingEvent.getEventType() == HistoryEventType.VERTEX_FINISHED) {
+          VertexFinishedEvent otherEvent = (VertexFinishedEvent) incomingEvent;
+          VertexFinishedEvent conditionEvent = (VertexFinishedEvent) event;
+          // compare vertexId
+          return otherEvent.getVertexID().getId() == conditionEvent.getVertexID().getId();
+        }
+        break;
+      case VERTEX_CONFIGURE_DONE:
+        if (incomingEvent.getEventType() == HistoryEventType.VERTEX_CONFIGURE_DONE) {
+          VertexConfigurationDoneEvent otherEvent = (VertexConfigurationDoneEvent) incomingEvent;
+          VertexConfigurationDoneEvent conditionEvent = (VertexConfigurationDoneEvent) event;
+          // compare vertexId
+          return otherEvent.getVertexID().getId() == conditionEvent.getVertexID().getId();
+        }
+        break;
+      case TASK_STARTED:
+        if (incomingEvent.getEventType() == HistoryEventType.TASK_STARTED) {
+          TaskStartedEvent otherEvent = (TaskStartedEvent) incomingEvent;
+          TaskStartedEvent conditionEvent = (TaskStartedEvent) event;
+          // compare vertexId and taskId
+          return otherEvent.getTaskID().getVertexID().getId() == conditionEvent.getTaskID().getVertexID().getId()
+              && otherEvent.getTaskID().getId() == conditionEvent.getTaskID().getId();
+        }
+        break;
+
+      case TASK_FINISHED:
+        if (incomingEvent.getEventType() == HistoryEventType.TASK_FINISHED) {
+          TaskFinishedEvent otherEvent = (TaskFinishedEvent) incomingEvent;
+          TaskFinishedEvent conditionEvent = (TaskFinishedEvent) event;
+          // compare vertexId and taskId
+          return otherEvent.getTaskID().getVertexID().getId() == conditionEvent.getTaskID().getVertexID().getId()
+              && otherEvent.getTaskID().getId() == conditionEvent.getTaskID().getId();
+        }
+        break;
+
+      case TASK_ATTEMPT_STARTED:
+        if (incomingEvent.getEventType() == HistoryEventType.TASK_ATTEMPT_STARTED) {
+          TaskAttemptStartedEvent otherEvent = (TaskAttemptStartedEvent) incomingEvent;
+          TaskAttemptStartedEvent conditionEvent = (TaskAttemptStartedEvent) event;
+          // compare vertexId, taskId & taskAttemptId
+          return otherEvent.getTaskAttemptID().getTaskID().getVertexID().getId() 
+              == conditionEvent.getTaskAttemptID().getTaskID().getVertexID().getId()
+              && otherEvent.getTaskAttemptID().getTaskID().getId() == conditionEvent.getTaskAttemptID().getTaskID().getId()
+              && otherEvent.getTaskAttemptID().getId() == conditionEvent.getTaskAttemptID().getId();
+        }
+        break;
+
+      case TASK_ATTEMPT_FINISHED:
+        if (incomingEvent.getEventType() == HistoryEventType.TASK_ATTEMPT_FINISHED) {
+          TaskAttemptFinishedEvent otherEvent = (TaskAttemptFinishedEvent) incomingEvent;
+          TaskAttemptFinishedEvent conditionEvent = (TaskAttemptFinishedEvent) event;
+          // compare vertexId, taskId & taskAttemptId
+          return otherEvent.getTaskAttemptID().getTaskID().getVertexID().getId() 
+              == conditionEvent.getTaskAttemptID().getTaskID().getVertexID().getId()
+              && otherEvent.getTaskAttemptID().getTaskID().getId() == conditionEvent.getTaskAttemptID().getTaskID().getId()
+              && otherEvent.getTaskAttemptID().getId() == conditionEvent.getTaskAttemptID().getId();
+        }
+        break;
+      default:
+        LOG.info("do nothing with event:"
+            + event.getEventType());
+      }
+
+      return false;
+    }
+    
+    public HistoryEventType getEventType() {
+      return event.getEventType();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
index 778825b..8e41b7e 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
@@ -24,26 +24,18 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezClient;
-import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.DAGStatus.State;
-import org.apache.tez.dag.app.RecoveryParser;
-import org.apache.tez.dag.history.HistoryEvent;
-import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.events.VertexInitializedEvent;
-import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
 import org.apache.tez.test.dag.MultiAttemptDAG;
 import org.apache.tez.test.dag.MultiAttemptDAG.FailingInputInitializer;
 import org.apache.tez.test.dag.MultiAttemptDAG.NoOpInput;
@@ -57,8 +49,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Random;
 
 public class TestDAGRecovery {
@@ -176,56 +166,6 @@ public class TestDAGRecovery {
     Assert.assertEquals(finalState, dagStatus.getState());
   }
 
-  private void verifyRecoveryLog() throws IOException{
-    ApplicationId appId = tezSession.getAppMasterApplicationId();
-    Path tezSystemStagingDir = TezCommonUtils.getTezSystemStagingPath(tezConf, appId.toString());
-    Path recoveryDataDir = TezCommonUtils.getRecoveryPath(tezSystemStagingDir, tezConf);
-
-    FileSystem fs = tezSystemStagingDir.getFileSystem(tezConf);
-    // verify recovery logs in each attempt
-    for (int attemptNum=1; attemptNum<=3; ++attemptNum) {
-      List<HistoryEvent> historyEvents = new ArrayList<HistoryEvent>();
-      // read the recovery logs for current attempt
-      // since dag recovery logs is dispersed in each attempt's recovery directory,
-      // so need to read recovery logs from the first attempt to current attempt
-      for (int i=1 ;i<=attemptNum;++i) {
-        Path currentAttemptRecoveryDataDir = TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir,i);
-        Path recoveryFilePath = new Path(currentAttemptRecoveryDataDir,
-        appId.toString().replace("application", "dag") + "_1" + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
-        historyEvents.addAll(RecoveryParser.parseDAGRecoveryFile(
-            fs.open(recoveryFilePath)));
-      }
-
-      int inputInfoEventIndex = -1;
-      int vertexInitedEventIndex = -1;
-      for (int j=0;j<historyEvents.size(); ++j) {
-        HistoryEvent historyEvent = historyEvents.get(j);
-        LOG.info("Parsed event from recovery stream"
-            + ", eventType=" + historyEvent.getEventType()
-            + ", event=" + historyEvent);
-        if (historyEvent.getEventType() ==  HistoryEventType.VERTEX_DATA_MOVEMENT_EVENTS_GENERATED) {
-          VertexRecoverableEventsGeneratedEvent dmEvent =
-              (VertexRecoverableEventsGeneratedEvent) historyEvent;
-          // TODO do not need to check whether it is -1 after Tez-1521 is resolved
-          if (dmEvent.getVertexID().getId() == 0 && inputInfoEventIndex == -1) {
-            inputInfoEventIndex = j;
-          }
-        }
-        if (historyEvent.getEventType() == HistoryEventType.VERTEX_INITIALIZED) {
-          VertexInitializedEvent vInitedEvent = (VertexInitializedEvent) historyEvent;
-          if (vInitedEvent.getVertexID().getId() == 0) {
-            vertexInitedEventIndex = j;
-          }
-        }
-      }
-      // v1's init events must be logged before its VertexInitializedEvent (Tez-1345)
-      Assert.assertTrue("can not find VERTEX_DATA_MOVEMENT_EVENTS_GENERATED for v1", inputInfoEventIndex != -1);
-      Assert.assertTrue("can not find VERTEX_INITIALIZED for v1", vertexInitedEventIndex != -1);
-      Assert.assertTrue("VERTEX_DATA_MOVEMENT_EVENTS_GENERATED is logged before VERTEX_INITIALIZED for v1",
-          inputInfoEventIndex < vertexInitedEventIndex);
-    }
-  }
-
   @Test(timeout=120000)
   public void testBasicRecovery() throws Exception {
     DAG dag = MultiAttemptDAG.createDAG("TestBasicRecovery", null);
@@ -236,8 +176,6 @@ public class TestDAGRecovery {
     dag.getVertex("v1").addDataSource("Input", dataSource);
 
     runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
-
-    verifyRecoveryLog();
   }
 
   @Test(timeout=120000)

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
new file mode 100644
index 0000000..45582a1
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
@@ -0,0 +1,484 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.app.RecoveryParser;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.impl.VertexStats;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.history.recovery.RecoveryService;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.examples.HashJoinExample;
+import org.apache.tez.examples.OrderedWordCount;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.test.RecoveryServiceWithEventHandlingHook.SimpleRecoveryEventHook;
+import org.apache.tez.test.RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition;
+import org.apache.tez.test.RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class TestRecovery {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestRecovery.class);
+
+  private static Configuration conf = new Configuration();
+  private static MiniTezCluster miniTezCluster = null;
+  private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+      + TestRecovery.class.getName() + "-tmpDir";
+  private static MiniDFSCluster dfsCluster = null;
+  private static FileSystem remoteFs = null;
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    LOG.info("Starting mini clusters");
+    try {
+      conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+      dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
+          .format(true).racks(null).build();
+      remoteFs = dfsCluster.getFileSystem();
+    } catch (IOException io) {
+      throw new RuntimeException("problem starting mini dfs cluster", io);
+    }
+    if (miniTezCluster == null) {
+      miniTezCluster = new MiniTezCluster(TestRecovery.class.getName(), 1, 1, 1);
+      Configuration miniTezconf = new Configuration(conf);
+      miniTezconf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 4);
+      miniTezconf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+      miniTezCluster.init(miniTezconf);
+      miniTezCluster.start();
+    }
+  }
+
+  @AfterClass
+  public static void afterClass() throws InterruptedException {
+    if (miniTezCluster != null) {
+      try {
+        LOG.info("Stopping MiniTezCluster");
+        miniTezCluster.stop();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+    if (dfsCluster != null) {
+      try {
+        LOG.info("Stopping DFSCluster");
+        dfsCluster.shutdown();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  @Test(timeout=1800000)
+  public void testRecovery_OrderedWordCount() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(),
+        1);
+    TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+    TezVertexID vertexId0 = TezVertexID.getInstance(dagId, 0);
+    TezVertexID vertexId1 = TezVertexID.getInstance(dagId, 1);
+    TezVertexID vertexId2 = TezVertexID.getInstance(dagId, 2);
+    ContainerId containerId = ContainerId.newContainerId(
+        ApplicationAttemptId.newInstance(appId, 1), 1);
+    NodeId nodeId = NodeId.newInstance("localhost", 10);
+    
+    List<TezEvent> initGeneratedEvents = Lists.newArrayList(
+            new TezEvent(InputDataInformationEvent.createWithObjectPayload(0, new Object()), null));
+
+    List<SimpleShutdownCondition> shutdownConditions = Lists
+        .newArrayList(
+            new SimpleShutdownCondition(TIMING.POST, new DAGInitializedEvent(
+                dagId, 0L, "username", "dagName", null)),
+            new SimpleShutdownCondition(TIMING.POST, new DAGStartedEvent(dagId,
+                0L, "username", "dagName")),
+            new SimpleShutdownCondition(TIMING.POST, new DAGFinishedEvent(
+                dagId, 0L, 0L, DAGState.SUCCEEDED, "", new TezCounters(),
+                "username", "dagName", new HashMap<String, Integer>(),
+                ApplicationAttemptId.newInstance(appId, 1))),
+            new SimpleShutdownCondition(TIMING.POST,
+                new VertexInitializedEvent(vertexId0, "Tokenizer", 0L, 0L, 0,
+                    "", null, initGeneratedEvents)),
+            new SimpleShutdownCondition(TIMING.POST,
+                new VertexInitializedEvent(vertexId1, "Summation", 0L, 0L, 0,
+                    "", null, null)),
+            new SimpleShutdownCondition(TIMING.POST,
+                new VertexInitializedEvent(vertexId2, "Sorter", 0L, 0L, 0, "",
+                    null, null)),
+
+            new SimpleShutdownCondition(TIMING.POST,
+                new VertexConfigurationDoneEvent(vertexId0, 0L, 2, null, null,
+                    null, true)),
+                        
+            new SimpleShutdownCondition(TIMING.POST,
+                new VertexConfigurationDoneEvent(vertexId1, 0L, 2, null, null,
+                    null, true)),
+
+            new SimpleShutdownCondition(TIMING.POST,
+                new VertexConfigurationDoneEvent(vertexId2, 0L, 2, null, null,
+                    null, true)),
+            new SimpleShutdownCondition(TIMING.POST, new VertexStartedEvent(
+                vertexId0, 0L, 0L)),
+            new SimpleShutdownCondition(TIMING.POST, new VertexStartedEvent(
+                vertexId1, 0L, 0L)),
+            new SimpleShutdownCondition(TIMING.POST, new VertexStartedEvent(
+                vertexId2, 0L, 0L)),
+
+            new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent(
+                vertexId0, "vertexName", 1, 0L, 0L, 0L, 0L, 0L,
+                VertexState.SUCCEEDED, "", new TezCounters(),
+                new VertexStats(), new HashMap<String, Integer>())),
+            new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent(
+                vertexId1, "vertexName", 1, 0L, 0L, 0L, 0L, 0L,
+                VertexState.SUCCEEDED, "", new TezCounters(),
+                new VertexStats(), new HashMap<String, Integer>())),
+            new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent(
+                vertexId2, "vertexName", 1, 0L, 0L, 0L, 0L, 0L,
+                VertexState.SUCCEEDED, "", new TezCounters(),
+                new VertexStats(), new HashMap<String, Integer>())),
+
+            new SimpleShutdownCondition(TIMING.POST, new TaskStartedEvent(
+                TezTaskID.getInstance(vertexId0, 0), "vertexName", 0L, 0L)),
+            new SimpleShutdownCondition(TIMING.POST, new TaskStartedEvent(
+                TezTaskID.getInstance(vertexId1, 0), "vertexName", 0L, 0L)),
+            new SimpleShutdownCondition(TIMING.POST, new TaskStartedEvent(
+                TezTaskID.getInstance(vertexId2, 0), "vertexName", 0L, 0L)),
+
+            new SimpleShutdownCondition(TIMING.POST, new TaskFinishedEvent(
+                TezTaskID.getInstance(vertexId0, 0), "vertexName", 0L, 0L,
+                null, TaskState.SUCCEEDED, "", new TezCounters(), 0)),
+            new SimpleShutdownCondition(TIMING.POST, new TaskFinishedEvent(
+                TezTaskID.getInstance(vertexId1, 0), "vertexName", 0L, 0L,
+                null, TaskState.SUCCEEDED, "", new TezCounters(), 0)),
+            new SimpleShutdownCondition(TIMING.POST, new TaskFinishedEvent(
+                TezTaskID.getInstance(vertexId2, 0), "vertexName", 0L, 0L,
+                null, TaskState.SUCCEEDED, "", new TezCounters(), 0)),
+
+            new SimpleShutdownCondition(TIMING.POST,
+                new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(
+                    TezTaskID.getInstance(vertexId0, 0), 0), "vertexName", 0L,
+                    containerId, nodeId, "", "", "")),
+            new SimpleShutdownCondition(TIMING.POST,
+                new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(
+                    TezTaskID.getInstance(vertexId1, 0), 0), "vertexName", 0L,
+                    containerId, nodeId, "", "", "")),
+            new SimpleShutdownCondition(TIMING.POST,
+                new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(
+                    TezTaskID.getInstance(vertexId2, 0), 0), "vertexName", 0L,
+                    containerId, nodeId, "", "", ""))
+
+        );
+
+    Random rand = new Random();
+    for (int i = 0; i < shutdownConditions.size(); i++) {
+      // randomly choose half of the test scenario to avoid
+      // timeout.
+      if (rand.nextDouble() < 0.5) {
+        testOrderedWordCount(shutdownConditions.get(i), true);
+      }
+    }
+  }
+
+  private void testOrderedWordCount(SimpleShutdownCondition shutdownCondition,
+      boolean enableAutoParallelism) throws Exception {
+    LOG.info("shutdownCondition:" + shutdownCondition.getEventType()
+        + ", event=" + shutdownCondition.getEvent());
+    String inputDirStr = "/tmp/owc-input/";
+    Path inputDir = new Path(inputDirStr);
+    Path stagingDirPath = new Path("/tmp/owc-staging-dir");
+    remoteFs.mkdirs(inputDir);
+    remoteFs.mkdirs(stagingDirPath);
+    TestTezJobs.generateOrderedWordCountInput(inputDir, remoteFs);
+
+    String outputDirStr = "/tmp/owc-output/";
+    Path outputDir = new Path(outputDirStr);
+
+    TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
+    tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 4);
+    tezConf.set(TezConfiguration.TEZ_AM_RECOVERY_SERVICE_CLASS,
+        RecoveryServiceWithEventHandlingHook.class.getName());
+    tezConf.set(
+        RecoveryServiceWithEventHandlingHook.AM_RECOVERY_SERVICE_HOOK_CLASS,
+        SimpleRecoveryEventHook.class.getName());
+    tezConf.set(SimpleRecoveryEventHook.SIMPLE_SHUTDOWN_CONDITION,
+        shutdownCondition.serialize());
+    tezConf.setBoolean(
+        ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
+        enableAutoParallelism);
+    tezConf.setBoolean(
+        RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, false);
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
+    tezConf.setBoolean(
+        TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE, false);
+    tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "INFO;org.apache.tez=DEBUG");
+
+    OrderedWordCount job = new OrderedWordCount();
+    Assert
+        .assertTrue("OrderedWordCount failed", job.run(tezConf, new String[] {
+            inputDirStr, outputDirStr, "5" }, null) == 0);
+    TestTezJobs.verifyOutput(outputDir, remoteFs);
+    List<HistoryEvent> historyEventsOfAttempt1 = RecoveryParser
+        .readRecoveryEvents(tezConf, job.getAppId(), 1);
+    HistoryEvent lastEvent = historyEventsOfAttempt1
+        .get(historyEventsOfAttempt1.size() - 1);
+    assertEquals(shutdownCondition.getEvent().getEventType(),
+        lastEvent.getEventType());
+    assertTrue(shutdownCondition.match(lastEvent));
+
+  }
+
+  @Test(timeout = 1800000)
+  public void testRecovery_HashJoin() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(),
+        1);
+    TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+    TezVertexID vertexId0 = TezVertexID.getInstance(dagId, 0);
+    TezVertexID vertexId1 = TezVertexID.getInstance(dagId, 1);
+    TezVertexID vertexId2 = TezVertexID.getInstance(dagId, 2);
+    ContainerId containerId = ContainerId.newContainerId(
+        ApplicationAttemptId.newInstance(appId, 1), 1);
+    NodeId nodeId = NodeId.newInstance("localhost", 10);
+    List<TezEvent> initGeneratedEvents = Lists.newArrayList(
+        new TezEvent(InputDataInformationEvent.createWithObjectPayload(0, new Object()), null));
+
+    List<SimpleShutdownCondition> shutdownConditions = Lists.newArrayList(
+
+        new SimpleShutdownCondition(TIMING.POST, new DAGInitializedEvent(dagId,
+            0L, "username", "dagName", null)),
+        new SimpleShutdownCondition(TIMING.POST, new DAGStartedEvent(dagId, 0L,
+            "username", "dagName")),
+        new SimpleShutdownCondition(TIMING.POST, new DAGFinishedEvent(dagId,
+            0L, 0L, DAGState.SUCCEEDED, "", new TezCounters(), "username",
+            "dagName", new HashMap<String, Integer>(), ApplicationAttemptId
+                .newInstance(appId, 1))),
+        new SimpleShutdownCondition(TIMING.POST, new VertexInitializedEvent(
+            vertexId0, "hashSide", 0L, 0L, 0, "", null, initGeneratedEvents)),
+        new SimpleShutdownCondition(TIMING.POST, new VertexInitializedEvent(
+            vertexId1, "streamingSide", 0L, 0L, 0, "", null, null)),
+        new SimpleShutdownCondition(TIMING.POST, new VertexInitializedEvent(
+            vertexId2, "joiner", 0L, 0L, 0, "", null, null)),
+
+        new SimpleShutdownCondition(TIMING.POST, new VertexStartedEvent(
+            vertexId0, 0L, 0L)),
+        new SimpleShutdownCondition(TIMING.POST, new VertexStartedEvent(
+            vertexId1, 0L, 0L)),
+        new SimpleShutdownCondition(TIMING.POST, new VertexStartedEvent(
+            vertexId2, 0L, 0L)),
+
+        new SimpleShutdownCondition(TIMING.POST,
+            new VertexConfigurationDoneEvent(vertexId0, 0L, 2, null, null,
+                null, true)),
+                    
+        new SimpleShutdownCondition(TIMING.POST,
+            new VertexConfigurationDoneEvent(vertexId1, 0L, 2, null, null,
+                null, true)),
+
+        new SimpleShutdownCondition(TIMING.POST,
+            new VertexConfigurationDoneEvent(vertexId2, 0L, 2, null, null,
+                null, true)),
+                    
+        new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent(
+            vertexId0, "vertexName", 1, 0L, 0L, 0L, 0L, 0L,
+            VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(),
+            new HashMap<String, Integer>())),
+        new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent(
+            vertexId1, "vertexName", 1, 0L, 0L, 0L, 0L, 0L,
+            VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(),
+            new HashMap<String, Integer>())),
+        new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent(
+            vertexId2, "vertexName", 1, 0L, 0L, 0L, 0L, 0L,
+            VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(),
+            new HashMap<String, Integer>())),
+
+        new SimpleShutdownCondition(TIMING.POST, new TaskStartedEvent(TezTaskID
+            .getInstance(vertexId0, 0), "vertexName", 0L, 0L)),
+        new SimpleShutdownCondition(TIMING.POST, new TaskStartedEvent(TezTaskID
+            .getInstance(vertexId1, 0), "vertexName", 0L, 0L)),
+        new SimpleShutdownCondition(TIMING.POST, new TaskStartedEvent(TezTaskID
+            .getInstance(vertexId2, 0), "vertexName", 0L, 0L)),
+
+        new SimpleShutdownCondition(TIMING.POST, new TaskFinishedEvent(
+            TezTaskID.getInstance(vertexId0, 0), "vertexName", 0L, 0L, null,
+            TaskState.SUCCEEDED, "", new TezCounters(), 0)),
+        new SimpleShutdownCondition(TIMING.POST, new TaskFinishedEvent(
+            TezTaskID.getInstance(vertexId1, 0), "vertexName", 0L, 0L, null,
+            TaskState.SUCCEEDED, "", new TezCounters(), 0)),
+        new SimpleShutdownCondition(TIMING.POST, new TaskFinishedEvent(
+            TezTaskID.getInstance(vertexId2, 0), "vertexName", 0L, 0L, null,
+            TaskState.SUCCEEDED, "", new TezCounters(), 0)),
+
+        new SimpleShutdownCondition(TIMING.POST,
+            new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(
+                TezTaskID.getInstance(vertexId0, 0), 0), "vertexName", 0L,
+                containerId, nodeId, "", "", "")),
+        new SimpleShutdownCondition(TIMING.POST,
+            new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(
+                TezTaskID.getInstance(vertexId1, 0), 0), "vertexName", 0L,
+                containerId, nodeId, "", "", "")),
+        new SimpleShutdownCondition(TIMING.POST,
+            new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(
+                TezTaskID.getInstance(vertexId2, 0), 0), "vertexName", 0L,
+                containerId, nodeId, "", "", ""))
+
+    );
+
+    Random rand = new Random();
+    for (int i = 0; i < shutdownConditions.size(); i++) {
+      // randomly choose half of the test scenario to avoid
+      // timeout.
+      if (rand.nextDouble() < 0.5) {
+        testHashJoinExample(shutdownConditions.get(i), true);
+      }
+    }
+  }
+
+  private void testHashJoinExample(SimpleShutdownCondition shutdownCondition,
+      boolean enableAutoParallelism) throws Exception {
+    HashJoinExample hashJoinExample = new HashJoinExample();
+    TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
+    tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 4);
+    tezConf.set(TezConfiguration.TEZ_AM_RECOVERY_SERVICE_CLASS,
+        RecoveryServiceWithEventHandlingHook.class.getName());
+    tezConf.set(
+        RecoveryServiceWithEventHandlingHook.AM_RECOVERY_SERVICE_HOOK_CLASS,
+        SimpleRecoveryEventHook.class.getName());
+    tezConf.set(SimpleRecoveryEventHook.SIMPLE_SHUTDOWN_CONDITION,
+        shutdownCondition.serialize());
+    tezConf.setBoolean(
+        ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
+        enableAutoParallelism);
+    tezConf.setBoolean(
+        RecoveryService.TEZ_TEST_RECOVERY_DRAIN_EVENTS_WHEN_STOPPED, false);
+    tezConf.setBoolean(
+        TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE, false);
+    tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "INFO;org.apache.tez=DEBUG");
+
+    hashJoinExample.setConf(tezConf);
+    Path stagingDirPath = new Path("/tmp/tez-staging-dir");
+    Path inPath1 = new Path("/tmp/hashJoin/inPath1");
+    Path inPath2 = new Path("/tmp/hashJoin/inPath2");
+    Path outPath = new Path("/tmp/hashJoin/outPath");
+    remoteFs.delete(outPath, true);
+    remoteFs.mkdirs(inPath1);
+    remoteFs.mkdirs(inPath2);
+    remoteFs.mkdirs(stagingDirPath);
+
+    Set<String> expectedResult = new HashSet<String>();
+
+    FSDataOutputStream out1 = remoteFs.create(new Path(inPath1, "file"));
+    FSDataOutputStream out2 = remoteFs.create(new Path(inPath2, "file"));
+    BufferedWriter writer1 = new BufferedWriter(new OutputStreamWriter(out1));
+    BufferedWriter writer2 = new BufferedWriter(new OutputStreamWriter(out2));
+    for (int i = 0; i < 20; i++) {
+      String term = "term" + i;
+      writer1.write(term);
+      writer1.newLine();
+      if (i % 2 == 0) {
+        writer2.write(term);
+        writer2.newLine();
+        expectedResult.add(term);
+      }
+    }
+    writer1.close();
+    writer2.close();
+    out1.close();
+    out2.close();
+
+    String[] args = new String[] {
+        "-D" + TezConfiguration.TEZ_AM_STAGING_DIR + "="
+            + stagingDirPath.toString(), inPath1.toString(),
+        inPath2.toString(), "1", outPath.toString() };
+    assertEquals(0, hashJoinExample.run(args));
+
+    FileStatus[] statuses = remoteFs.listStatus(outPath, new PathFilter() {
+      public boolean accept(Path p) {
+        String name = p.getName();
+        return !name.startsWith("_") && !name.startsWith(".");
+      }
+    });
+    assertEquals(1, statuses.length);
+    FSDataInputStream inStream = remoteFs.open(statuses[0].getPath());
+    BufferedReader reader = new BufferedReader(new InputStreamReader(inStream));
+    String line;
+    while ((line = reader.readLine()) != null) {
+      assertTrue(expectedResult.remove(line));
+    }
+    reader.close();
+    inStream.close();
+    assertEquals(0, expectedResult.size());
+
+    List<HistoryEvent> historyEventsOfAttempt1 = RecoveryParser
+        .readRecoveryEvents(tezConf, hashJoinExample.getAppId(), 1);
+    HistoryEvent lastEvent = historyEventsOfAttempt1
+        .get(historyEventsOfAttempt1.size() - 1);
+    assertEquals(shutdownCondition.getEvent().getEventType(),
+        lastEvent.getEventType());
+    assertTrue(shutdownCondition.match(lastEvent));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index ab89ddb..c3e8487 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -573,7 +573,7 @@ public class TestTezJobs {
     }
   }
 
-  private void generateOrderedWordCountInput(Path inputDir, FileSystem fs) throws IOException {
+  public static void generateOrderedWordCountInput(Path inputDir, FileSystem fs) throws IOException {
     Path dataPath1 = new Path(inputDir, "inPath1");
     Path dataPath2 = new Path(inputDir, "inPath2");
 
@@ -606,7 +606,7 @@ public class TestTezJobs {
     }
   }
 
-  private void verifyOrderedWordCountOutput(Path resultFile, FileSystem fs) throws IOException {
+  public static void verifyOrderedWordCountOutput(Path resultFile, FileSystem fs) throws IOException {
     FSDataInputStream inputStream = fs.open(resultFile);
     final String prefix = "a";
     int currentCounter = 10;
@@ -631,7 +631,7 @@ public class TestTezJobs {
     Assert.assertEquals(0, currentCounter);
   }
   
-  private void verifyOutput(Path outputDir, FileSystem fs) throws IOException {
+  public static void verifyOutput(Path outputDir, FileSystem fs) throws IOException {
     FileStatus[] fileStatuses = fs.listStatus(outputDir);
     Path resultFile = null;
     boolean foundResult = false;

http://git-wip-us.apache.org/repos/asf/tez/blob/28f30b0e/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
index 5c6f855..cdf69e6 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
@@ -121,7 +121,7 @@ public class MultiAttemptDAG {
             + ", currentAttempt=" + getContext().getDAGAttemptNumber());
         if (successAttemptId > getContext().getDAGAttemptNumber()) {
           Runtime.getRuntime().halt(-1);
-        } else if (successAttemptId == getContext().getDAGAttemptNumber()) {
+        } else {
           LOG.info("Scheduling tasks for vertex=" + getContext().getVertexName());
           int numTasks = getContext().getVertexNumTasks(getContext().getVertexName());
           List<ScheduleTaskRequest> scheduledTasks = Lists.newArrayListWithCapacity(numTasks);


Mime
View raw message