tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject [1/2] TEZ-904. Committer recovery events should be out-of-band. (hitesh)
Date Fri, 14 Mar 2014 01:47:32 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master 693f2ca92 -> f58508a56


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index 5986657..0074a4c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -18,6 +18,16 @@
 
 package org.apache.tez.dag.history.recovery;
 
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -33,16 +43,6 @@ import org.apache.tez.dag.history.SummaryEvent;
 import org.apache.tez.dag.history.events.DAGSubmittedEvent;
 import org.apache.tez.dag.records.TezDAGID;
 
-import java.io.IOException;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 public class RecoveryService extends AbstractService {
 
   private static final Log LOG = LogFactory.getLog(RecoveryService.class);
@@ -120,7 +120,7 @@ public class RecoveryService extends AbstractService {
           synchronized (lock) {
             try {
               ++eventsProcessed;
-              handleEvent(event);
+              handleRecoveryEvent(event);
             } catch (Exception e) {
               // TODO handle failures - treat as fatal or ignore?
               LOG.warn("Error handling recovery event", e);
@@ -175,26 +175,56 @@ public class RecoveryService extends AbstractService {
       return;
     }
 
-    if (eventType.equals(HistoryEventType.DAG_SUBMITTED)
-      || eventType.equals(HistoryEventType.DAG_FINISHED)) {
-      // handle submissions and completion immediately
+    TezDAGID dagId = event.getDagID();
+    if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
+      DAGSubmittedEvent dagSubmittedEvent =
+          (DAGSubmittedEvent) event.getHistoryEvent();
+      String dagName = dagSubmittedEvent.getDAGName();
+      if (dagName != null
+          && dagName.startsWith(
+          TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) {
+        // Skip recording pre-warm DAG events
+        skippedDAGs.add(dagId);
+        return;
+      }
+    }
+    if (dagId == null || skippedDAGs.contains(dagId)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Skipping event for DAG"
+            + ", eventType=" + eventType
+            + ", dagId=" + (dagId == null ? "null" : dagId.toString())
+            + ", isSkippedDAG=" + (dagId == null ? "null"
+            : skippedDAGs.contains(dagId)));
+      }
+      return;
+    }
+
+    if (event.getHistoryEvent() instanceof SummaryEvent) {
       synchronized (lock) {
         try {
-          handleEvent(event);
+          SummaryEvent summaryEvent = (SummaryEvent) event.getHistoryEvent();
+          handleSummaryEvent(dagId, eventType, summaryEvent);
           summaryStream.hsync();
-          if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
-            if (outputStreamMap.containsKey(event.getDagID())) {
-              doFlush(outputStreamMap.get(event.getDagID()),
-                  appContext.getClock().getTime(), true);
+          if (summaryEvent.writeToRecoveryImmediately()) {
+            handleRecoveryEvent(event);
+            doFlush(outputStreamMap.get(event.getDagID()),
+                appContext.getClock().getTime(), true);
+          } else {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Queueing Non-immediate Summary/Recovery event of type"
+                  + eventType.name());
             }
-          } else if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
-            completedDAGs.add(event.getDagID());
-            if (outputStreamMap.containsKey(event.getDagID())) {
+            eventQueue.add(event);
+          }
+          if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
+            LOG.info("DAG completed"
+                + ", dagId=" + event.getDagID()
+                + ", queueSize=" + eventQueue.size());
+            completedDAGs.add(dagId);
+            if (outputStreamMap.containsKey(dagId)) {
               try {
-                doFlush(outputStreamMap.get(event.getDagID()),
-                    appContext.getClock().getTime(), true);
-                outputStreamMap.get(event.getDagID()).close();
-                outputStreamMap.remove(event.getDagID());
+                outputStreamMap.get(dagId).close();
+                outputStreamMap.remove(dagId);
               } catch (IOException ioe) {
                 LOG.warn("Error when trying to flush/close recovery file for"
                     + " dag, dagId=" + event.getDagID());
@@ -207,87 +237,71 @@ public class RecoveryService extends AbstractService {
           LOG.warn("Error handling recovery event", e);
         }
       }
-      LOG.info("DAG completed"
-          + ", dagId=" + event.getDagID()
-          + ", queueSize=" + eventQueue.size());
     } else {
       // All other events just get queued
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Queueing Recovery event of type " + eventType.name());
+        LOG.debug("Queueing Non-Summary Recovery event of type " + eventType.name());
       }
       eventQueue.add(event);
     }
   }
 
+  private void handleSummaryEvent(TezDAGID dagID,
+      HistoryEventType eventType,
+      SummaryEvent summaryEvent) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Handling summary event"
+          + ", dagID=" + dagID
+          + ", eventType=" + eventType);
+    }
+    try {
+      if (summaryStream == null) {
+        Path summaryPath = new Path(recoveryPath,
+            appContext.getApplicationID()
+                + TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX);
+        if (!recoveryDirFS.exists(summaryPath)) {
+          summaryStream = recoveryDirFS.create(summaryPath, false,
+              bufferSize);
+        } else {
+          summaryStream = recoveryDirFS.append(summaryPath, bufferSize);
+        }
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Writing recovery event to summary stream"
+            + ", dagId=" + dagID
+            + ", eventType=" + eventType);
+      }
+      summaryEvent.toSummaryProtoStream(summaryStream);
+    } catch (IOException ioe) {
+      // FIXME handle failures
+      LOG.warn("Failed to write to stream", ioe);
+    }
+
 
-  private void handleEvent(DAGHistoryEvent event) {
+  }
+
+  private void handleRecoveryEvent(DAGHistoryEvent event) {
     HistoryEventType eventType = event.getHistoryEvent().getEventType();
     if (LOG.isDebugEnabled()) {
       LOG.debug("Handling recovery event of type "
           + event.getHistoryEvent().getEventType());
     }
-    if (event.getDagID() == null) {
-      // AM event
-      // anything to be done?
-      // TODO
-      LOG.info("Skipping Recovery Event as DAG is null"
-          + ", eventType=" + event.getHistoryEvent().getEventType());
-      return;
-    }
-
     TezDAGID dagID = event.getDagID();
-    if (completedDAGs.contains(dagID)
-        || skippedDAGs.contains(dagID)) {
-      // Skip events for completed and skipped DAGs
+
+    if (completedDAGs.contains(dagID)) {
       // no need to recover completed DAGs
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Skipping Recovery Event as either completed or skipped"
+        LOG.debug("Skipping Recovery Event as DAG completed"
             + ", dagId=" + dagID
             + ", completed=" + completedDAGs.contains(dagID)
             + ", skipped=" + skippedDAGs.contains(dagID)
-            + ", eventType=" + event.getHistoryEvent().getEventType());
+            + ", eventType=" + eventType);
       }
       return;
     }
 
     try {
 
-      if (summaryStream == null) {
-        Path summaryPath = new Path(recoveryPath,
-            appContext.getApplicationID()
-                + TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX);
-        if (!recoveryDirFS.exists(summaryPath)) {
-          summaryStream = recoveryDirFS.create(summaryPath, false,
-              bufferSize);
-        } else {
-          summaryStream = recoveryDirFS.append(summaryPath, bufferSize);
-        }
-      }
-
-      if (eventType.equals(HistoryEventType.DAG_SUBMITTED)
-          || eventType.equals(HistoryEventType.DAG_FINISHED)) {
-        if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
-          DAGSubmittedEvent dagSubmittedEvent =
-              (DAGSubmittedEvent) event.getHistoryEvent();
-          String dagName = dagSubmittedEvent.getDAGName();
-          if (dagName != null
-              && dagName.startsWith(
-              TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) {
-            // Skip recording pre-warm DAG events
-            skippedDAGs.add(dagID);
-            return;
-          }
-        }
-        SummaryEvent summaryEvent = (SummaryEvent) event.getHistoryEvent();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Writing recovery event to summary stream"
-              + ", dagId=" + dagID
-              + ", type="
-              + event.getHistoryEvent().getEventType());
-        }
-        summaryEvent.toSummaryProtoStream(summaryStream);
-      }
-
       if (!outputStreamMap.containsKey(dagID)) {
         Path dagFilePath = new Path(recoveryPath,
             dagID.toString() + TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
@@ -313,8 +327,7 @@ public class RecoveryService extends AbstractService {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Writing recovery event to output stream"
             + ", dagId=" + dagID
-            + ", type="
-            + event.getHistoryEvent().getEventType());
+            + ", eventType=" + eventType);
       }
       ++unflushedEventsCount;
       outputStream.writeInt(event.getHistoryEvent().getEventType().ordinal());
@@ -339,11 +352,9 @@ public class RecoveryService extends AbstractService {
         && ((currentTime - lastFlushTime) >= (flushInterval*1000))) {
       doFlush = true;
     }
-
     if (!doFlush) {
       return;
     }
-
     doFlush(outputStream, currentTime, false);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index 65f3aaf..c640baa 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -59,6 +59,7 @@ message DAGStartedProto {
 message DAGCommitStartedProto {
   optional string dag_id = 1;
 }
+
 message DAGFinishedProto {
   optional string dag_id = 1;
   optional int64 finish_time = 2;
@@ -99,6 +100,20 @@ message VertexCommitStartedProto {
   optional string vertex_id = 1;
 }
 
+message VertexCommitFinishedProto {
+  optional string vertex_id = 1;
+}
+
+message VertexGroupCommitStartedProto {
+  optional string dag_id = 1;
+  optional string vertex_group_name = 2;
+}
+
+message VertexGroupCommitFinishedProto {
+  optional string dag_id = 1;
+  optional string vertex_group_name = 2;
+}
+
 message VertexFinishedProto {
   optional string vertex_name = 1;
   optional string vertex_id = 2;
@@ -162,4 +177,10 @@ message SummaryEventProto {
   optional string dag_id = 1;
   optional int64 timestamp = 2;
   optional int32 event_type = 3;
+  optional bytes event_payload = 4;
+}
+
+message VertexFinishStateProto {
+  optional string vertex_id = 1;
+  optional int32 state = 2;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index f03863c..2cf3eaf 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -35,10 +35,12 @@ import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.SummaryEvent;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
 import org.apache.tez.runtime.RuntimeUtils;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
@@ -78,6 +80,26 @@ public class TestHistoryEventsProtoConversion {
     return deserializedEvent;
   }
 
+  private HistoryEvent testSummaryProtoConversion(HistoryEvent historyEvent)
+      throws IOException {
+    SummaryEvent event = (SummaryEvent) historyEvent;
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    HistoryEvent deserializedEvent = null;
+    event.toSummaryProtoStream(os);
+    os.flush();
+    os.close();
+    LOG.info("Serialized event to byte array"
+        + ", eventType=" + historyEvent.getEventType()
+        + ", bufLen=" + os.toByteArray().length);
+    SummaryEventProto summaryEventProto =
+        SummaryEventProto.parseDelimitedFrom(
+            new ByteArrayInputStream(os.toByteArray()));
+    deserializedEvent = RuntimeUtils.createClazzInstance(
+        event.getClass().getName());
+    ((SummaryEvent)deserializedEvent).fromSummaryProtoStream(summaryEventProto);
+    return deserializedEvent;
+  }
+
   private void logEvents(HistoryEvent event,
       HistoryEvent deserializedEvent) {
     LOG.info("Initial Event toString: " + event.toString());
@@ -485,7 +507,7 @@ public class TestHistoryEventsProtoConversion {
 
   private void testDAGCommitStartedEvent() throws Exception {
     DAGCommitStartedEvent event = new DAGCommitStartedEvent(
-        TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1));
+        TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 100l);
     DAGCommitStartedEvent deserializedEvent =
         (DAGCommitStartedEvent) testProtoConversion(event);
     Assert.assertEquals(event.getDagID(), deserializedEvent.getDagID());
@@ -495,13 +517,56 @@ public class TestHistoryEventsProtoConversion {
   private void testVertexCommitStartedEvent() throws Exception {
     VertexCommitStartedEvent event = new VertexCommitStartedEvent(
         TezVertexID.getInstance(
-            TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1));
+            TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1), 100l);
     VertexCommitStartedEvent deserializedEvent =
         (VertexCommitStartedEvent) testProtoConversion(event);
     Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
     logEvents(event, deserializedEvent);
   }
 
+  private void testVertexGroupCommitStartedEvent() throws Exception {
+    VertexGroupCommitStartedEvent event = new VertexGroupCommitStartedEvent(
+        TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1),
+        "fooGroup", 1000344l);
+    {
+      VertexGroupCommitStartedEvent deserializedEvent =
+          (VertexGroupCommitStartedEvent) testProtoConversion(event);
+      Assert.assertEquals(event.getDagID(), deserializedEvent.getDagID());
+      Assert.assertEquals(event.getVertexGroupName(),
+          deserializedEvent.getVertexGroupName());
+      logEvents(event, deserializedEvent);
+    }
+    {
+      VertexGroupCommitStartedEvent deserializedEvent =
+          (VertexGroupCommitStartedEvent) testSummaryProtoConversion(event);
+      Assert.assertEquals(event.getVertexGroupName(),
+          deserializedEvent.getVertexGroupName());
+      logEvents(event, deserializedEvent);
+    }
+  }
+
+  private void testVertexGroupCommitFinishedEvent() throws Exception {
+    VertexGroupCommitFinishedEvent event = new VertexGroupCommitFinishedEvent(
+        TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1),
+        "fooGroup", 1000344l);
+    {
+      VertexGroupCommitFinishedEvent deserializedEvent =
+          (VertexGroupCommitFinishedEvent) testProtoConversion(event);
+      Assert.assertEquals(event.getDagID(), deserializedEvent.getDagID());
+      Assert.assertEquals(event.getVertexGroupName(),
+          deserializedEvent.getVertexGroupName());
+      logEvents(event, deserializedEvent);
+    }
+    {
+      VertexGroupCommitFinishedEvent deserializedEvent =
+          (VertexGroupCommitFinishedEvent) testSummaryProtoConversion(event);
+      Assert.assertEquals(event.getVertexGroupName(),
+          deserializedEvent.getVertexGroupName());
+      logEvents(event, deserializedEvent);
+    }
+  }
+
+
   @Test
   public void testDefaultProtoConversion() throws Exception {
     for (HistoryEventType eventType : HistoryEventType.values()) {
@@ -560,6 +625,12 @@ public class TestHistoryEventsProtoConversion {
         case VERTEX_COMMIT_STARTED:
           testVertexCommitStartedEvent();
           break;
+        case VERTEX_GROUP_COMMIT_STARTED:
+          testVertexGroupCommitStartedEvent();
+          break;
+        case VERTEX_GROUP_COMMIT_FINISHED:
+          testVertexGroupCommitFinishedEvent();
+          break;
         default:
           throw new Exception("Unhandled Event type in Unit tests: " + eventType);
         }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
new file mode 100644
index 0000000..2716fdd
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.DAGStatus.State;
+import org.apache.tez.test.dag.MultiAttemptDAG;
+import org.apache.tez.test.dag.SimpleVTestDAG;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Random;
+
+public class TestDAGRecovery2 {
+
+  private static final Log LOG = LogFactory.getLog(TestDAGRecovery2.class);
+
+  private static Configuration conf = new Configuration();
+  private static MiniTezCluster miniTezCluster;
+  private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+      + TestDAGRecovery2.class.getName() + "-tmpDir";
+  protected static MiniDFSCluster dfsCluster;
+
+  private static TezSession tezSession = null;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    LOG.info("Starting mini clusters");
+    FileSystem remoteFs = null;
+    try {
+      conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+      dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
+          .format(true).racks(null).build();
+      remoteFs = dfsCluster.getFileSystem();
+    } catch (IOException io) {
+      throw new RuntimeException("problem starting mini dfs cluster", io);
+    }
+    if (miniTezCluster == null) {
+      miniTezCluster = new MiniTezCluster(TestDAGRecovery2.class.getName(),
+          1, 1, 1);
+      Configuration miniTezconf = new Configuration(conf);
+      miniTezconf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 4);
+      miniTezconf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+      miniTezCluster.init(miniTezconf);
+      miniTezCluster.start();
+
+      Path remoteStagingDir = remoteFs.makeQualified(new Path(TEST_ROOT_DIR, String
+          .valueOf(new Random().nextInt(100000))));
+      TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
+
+      TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
+      tezConf.setInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, 10);
+      tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "DEBUG");
+      tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+          remoteStagingDir.toString());
+      tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
+      tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 4);
+      tezConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 500);
+      tezConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS, " -Xmx256m");
+
+      AMConfiguration amConfig = new AMConfiguration(
+          new HashMap<String, String>(), new HashMap<String, LocalResource>(),
+          tezConf, null);
+      TezSessionConfiguration tezSessionConfig =
+          new TezSessionConfiguration(amConfig, tezConf);
+      tezSession = new TezSession("TestDAGRecovery2", tezSessionConfig);
+      tezSession.start();
+    }
+  }
+  void runDAGAndVerify(DAG dag, DAGStatus.State finalState) throws Exception {
+    TezSessionStatus status = tezSession.getSessionStatus();
+    while (status != TezSessionStatus.READY && status != TezSessionStatus.SHUTDOWN)
{
+      LOG.info("Waiting for session to be ready. Current: " + status);
+      Thread.sleep(100);
+      status = tezSession.getSessionStatus();
+    }
+    if (status == TezSessionStatus.SHUTDOWN) {
+      throw new TezUncheckedException("Unexpected Session shutdown");
+    }
+    DAGClient dagClient = tezSession.submitDAG(dag);
+    DAGStatus dagStatus = dagClient.getDAGStatus(null);
+    while (!dagStatus.isCompleted()) {
+      LOG.info("Waiting for dag to complete. Sleeping for 500ms."
+          + " DAG name: " + dag.getName()
+          + " DAG appId: " + dagClient.getApplicationId()
+          + " Current state: " + dagStatus.getState());
+      Thread.sleep(100);
+      dagStatus = dagClient.getDAGStatus(null);
+    }
+
+    Assert.assertEquals(finalState, dagStatus.getState());
+  }
+
+  @Test(timeout=120000)
+  public void testBasicRecovery() throws Exception {
+    DAG dag = SimpleVTestDAG.createDAG("FailingCommitterDAG", null);
+    OutputDescriptor od =
+        new OutputDescriptor(MultiAttemptDAG.NoOpOutput.class.getName());
+    od.setUserPayload(new
+        MultiAttemptDAG.FailingOutputCommitter.FailingOutputCommitterConfig(true)
+            .toUserPayload());
+    dag.getVertex("v3").addOutput("FailingOutput", od,
+        MultiAttemptDAG.FailingOutputCommitter.class);
+    runDAGAndVerify(dag, State.FAILED);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
index 086e458..6a25f13 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
@@ -216,6 +216,11 @@ public class TestProcessor implements LogicalIOProcessor {
         LOG.info(msg);
     }
     for (Map.Entry<String, LogicalOutput> entry : outputs.entrySet()) {
+      if (!(entry.getValue() instanceof TestOutput)) {
+        LOG.info("Ignoring non TestOutput: " + entry.getKey()
+            + " outputClass= " + entry.getValue().getClass().getSimpleName());
+        continue;
+      }
       LOG.info("Writing output: " + entry.getKey() + " sum= " + sum);
       TestOutput output = (TestOutput) entry.getValue();
       output.write(sum);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f58508a5/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
index 65c8383..80d8588 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
@@ -18,6 +18,8 @@
 
 package org.apache.tez.test.dag;
 
+import com.google.common.primitives.Booleans;
+import com.google.common.primitives.Ints;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -30,15 +32,24 @@ import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.client.VertexStatus.State;
 import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.MemoryUpdateCallback;
+import org.apache.tez.runtime.api.OutputCommitter;
+import org.apache.tez.runtime.api.OutputCommitterContext;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.Writer;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.test.TestInput;
 import org.apache.tez.test.TestOutput;
 import org.apache.tez.test.TestProcessor;
+import org.apache.tez.test.dag.MultiAttemptDAG.FailingOutputCommitter.FailingOutputCommitterConfig;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -56,6 +67,10 @@ public class MultiAttemptDAG {
       "tez.multi-attempt-dag.vertex.num-tasks";
   public static int MULTI_ATTEMPT_DAG_VERTEX_NUM_TASKS_DEFAULT = 2;
 
+  public static String MULTI_ATTEMPT_DAG_USE_FAILING_COMMITTER =
+      "tez.multi-attempt-dag.use-failing-committer";
+  public static boolean MULTI_ATTEMPT_DAG_USE_FAILING_COMMITTER_DEFAULT = false;
+
   public static class FailOnAttemptVertexManagerPlugin
       implements VertexManagerPlugin {
     private int numSourceTasks = 0;
@@ -130,6 +145,100 @@ public class MultiAttemptDAG {
     }
   }
 
+  public static class FailingOutputCommitter extends OutputCommitter {
+
+    boolean failOnCommit = false;
+
+    @Override
+    public void initialize(OutputCommitterContext context) throws Exception {
+      FailingOutputCommitterConfig config = new
+          FailingOutputCommitterConfig();
+      config.fromUserPayload(context.getUserPayload());
+      failOnCommit = config.failOnCommit;
+    }
+
+    @Override
+    public void setupOutput() throws Exception {
+
+    }
+
+    @Override
+    public void commitOutput() throws Exception {
+      if (failOnCommit) {
+        LOG.info("Committer causing AM to shutdown");
+        Runtime.getRuntime().halt(-1);
+      }
+    }
+
+    @Override
+    public void abortOutput(State finalState) throws Exception {
+
+    }
+
+    public static class FailingOutputCommitterConfig {
+      boolean failOnCommit;
+
+      public FailingOutputCommitterConfig() {
+        this(false);
+      }
+
+      public FailingOutputCommitterConfig(boolean failOnCommit) {
+        this.failOnCommit = failOnCommit;
+      }
+
+      public byte[] toUserPayload() {
+        return Ints.toByteArray((failOnCommit ? 1 : 0));
+      }
+
+      public void fromUserPayload(byte[] userPayload) {
+        int failInt = Ints.fromByteArray(userPayload);
+        if (failInt == 0) {
+          failOnCommit = false;
+        } else {
+          failOnCommit = true;
+        }
+      }
+    }
+  }
+
+  public static class NoOpOutput implements LogicalOutput, MemoryUpdateCallback {
+
+    @Override
+    public void setNumPhysicalOutputs(int numOutputs) {
+
+    }
+
+    @Override
+    public List<Event> initialize(TezOutputContext outputContext) throws Exception
{
+      outputContext.requestInitialMemory(1l, this);
+      return null;
+    }
+
+    @Override
+    public void start() throws Exception {
+
+    }
+
+    @Override
+    public Writer getWriter() throws Exception {
+      return null;
+    }
+
+    @Override
+    public void handleEvents(List<Event> outputEvents) {
+
+    }
+
+    @Override
+    public List<Event> close() throws Exception {
+      return null;
+    }
+
+    @Override
+    public void memoryAssigned(long assignedSize) {
+    }
+  }
+
 
   public static DAG createDAG(String name,
       Configuration conf) throws Exception {


Mime
View raw message