tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-728. Semantics of output commit (bikas)
Date Fri, 17 Jan 2014 22:32:30 GMT
Updated Branches:
  refs/heads/master 4ba6f07b9 -> cdb62f900


TEZ-728. Semantics of output commit (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/cdb62f90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/cdb62f90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/cdb62f90

Branch: refs/heads/master
Commit: cdb62f9003686e4408ee39addfba23eb6fa2c5d6
Parents: 4ba6f07
Author: Bikas Saha <bikas@apache.org>
Authored: Fri Jan 17 14:32:18 2014 -0800
Committer: Bikas Saha <bikas@apache.org>
Committed: Fri Jan 17 14:32:18 2014 -0800

----------------------------------------------------------------------
 .../apache/tez/dag/api/TezConfiguration.java    |   4 +
 .../apache/tez/runtime/api/OutputCommitter.java |  27 +-
 .../tez/runtime/api/OutputCommitterContext.java |   2 -
 .../tez/dag/app/dag/DAGTerminationCause.java    |   3 +
 .../java/org/apache/tez/dag/app/dag/Vertex.java |   3 +
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    | 182 +++++++++++--
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  34 +--
 .../tez/dag/app/dag/impl/TestDAGImpl.java       | 256 ++++++++++++++++++-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |   8 +-
 9 files changed, 452 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cdb62f90/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 5b95f80..b5dac99 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -50,6 +50,10 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_AM_LOG_LEVEL = TEZ_AM_PREFIX+"log.level";
   public static final String TEZ_AM_LOG_LEVEL_DEFAULT = "INFO";
 
+  public static final String TEZ_AM_ABORT_ALL_OUTPUTS_ON_DAG_FAILURE = 
+      TEZ_AM_PREFIX + "abort-all-outputs-on-dag-failure";
+  public static final boolean TEZ_AM_ABORT_ALL_OUTPUTS_ON_DAG_FAILURE_DEFAULT = true;
+  
   public static final String TEZ_AM_JAVA_OPTS = TEZ_AM_PREFIX
       + "java.opts";
   public static final String DEFAULT_TEZ_AM_JAVA_OPTS = " -Xmx1024m ";

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cdb62f90/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java
index f0cee4e..301d01b 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java
@@ -20,8 +20,6 @@ package org.apache.tez.runtime.api;
 
 import org.apache.tez.dag.api.client.VertexStatus;
 
-import java.io.IOException;
-
 /**
  * OutputCommitter to "finalize" the output and make it user-visible if needed.
  * The OutputCommitter is allowed only on a terminal Output.
@@ -47,22 +45,25 @@ public abstract class OutputCommitter {
   public abstract void setupOutput() throws Exception;
 
   /**
-   * For committing the output after a successful vertex completion.
-   * Note that this is invoked for the outputs of vertices with a successful
-   * final state. This is called from the application master process.
-   * This is guaranteed to only be called once.
-   * If it throws an exception the entire vertex will fail.
-   *
+   * For committing the output after successful completion of tasks that write
+   * the output. Note that this is invoked for the outputs of vertices whose
+   * tasks have successfully completed. This is called from the application
+   * master process. Based on user configuration, commit is called at the end of
+   * the DAG execution for all outputs or immediately upon completion of all the
+   * tasks that produced the output. This is guaranteed to only be called once.
+   * 
    * @throws java.lang.Exception
    */
   public abstract void commitOutput() throws Exception;
 
   /**
-   * For aborting an unsuccessful vertex's output. Note that this is invoked for
-   * vertices with a final failed state. This is called from the application
-   * master process. This may be called multiple times.
-   *
-   * @param finalState final run-state of the vertex
+   * For aborting an output. Note that this is invoked for vertices with a final
+   * non-successful state. This is also called to abort a previously committed
+   * output in the case of a post-commit failure. This is called from the
+   * application master process. This may be called multiple times.
+   * 
+   * @param finalState
+   *          final run-state of the vertex
    * @throws java.lang.Exception
    */
   public abstract void abortOutput(VertexStatus.State finalState)

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cdb62f90/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java
b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java
index 2136073..3132758 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java
@@ -20,8 +20,6 @@ package org.apache.tez.runtime.api;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 
-import java.util.List;
-
 /**
  * Context through which the OutputCommitter can access all the relevant
  * information that it needs.

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cdb62f90/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
index 3b097eb..f7020da 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGTerminationCause.java
@@ -36,5 +36,8 @@ public enum DAGTerminationCause {
   /** DAG failed during init. */
   INIT_FAILURE,
   
+  /** DAG failed during output commit. */
+  COMMIT_FAILURE,
+  
   INTERNAL_ERROR
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cdb62f90/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 9503058..134e5fe 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -38,6 +38,7 @@ import org.apache.tez.dag.app.dag.impl.Edge;
 import org.apache.tez.dag.app.dag.impl.RootInputLeafOutputDescriptor;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.impl.InputSpec;
 import org.apache.tez.runtime.api.impl.OutputSpec;
 
@@ -82,6 +83,8 @@ public interface Vertex extends Comparable<Vertex> {
 
   Map<Vertex, Edge> getInputVertices();
   Map<Vertex, Edge> getOutputVertices();
+  
+  Map<String, OutputCommitter> getOutputCommitters();
 
   void setAdditionalInputs(List<RootInputLeafOutputProto> inputs);
   void setAdditionalOutputs(List<RootInputLeafOutputProto> outputs);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cdb62f90/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index cd11a0f..4c9ec64 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.app.dag.impl;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -51,12 +52,14 @@ import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.DAGStatusBuilder;
 import org.apache.tez.dag.api.client.ProgressBuilder;
 import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.api.client.VertexStatus;
 import org.apache.tez.dag.api.client.VertexStatusBuilder;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
@@ -90,8 +93,10 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.utils.TezBuilderUtils;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.runtime.api.OutputCommitter;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
 
 /** Implementation of Job interface. Maintains the state machines of Job.
  * The read and write calls use ReadWriteLock for concurrency.
@@ -117,6 +122,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   private final TaskAttemptListener taskAttemptListener;
   private final TaskHeartbeatHandler taskHeartbeatHandler;
   private final Object tasksSyncHandle = new Object();
+  
+  private volatile boolean committedOrAborted = false;
+  private volatile boolean allOutputsCommitted = false;
+  boolean abortAllOutputsOnFailure = true;
 
   @VisibleForTesting
   DAGScheduler dagScheduler;
@@ -597,6 +606,102 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       }
     }
   }
+  
+  private synchronized boolean commitOrAbortOutputs(boolean dagSucceeded) {
+    if (this.committedOrAborted) {
+      LOG.info("Ignoring multiple output commit/abort");
+      return this.allOutputsCommitted;
+    }
+    LOG.info("Calling DAG commit/abort for dag: " + getID());
+    this.committedOrAborted = true;
+    
+    boolean successfulOutputsAlreadyCommitted = !abortAllOutputsOnFailure;
+    boolean failedWhileCommitting = false;
+    final Set<OutputCommitter> committedOutputs = Sets.newHashSet();
+    if (dagSucceeded && !successfulOutputsAlreadyCommitted) {
+      // commit all outputs
+      // we come here for successful dag completion and when outputs need to be
+      // committed at the end for all or none visibility
+      for (Vertex vertex : vertices.values()) {
+        Map<String, OutputCommitter> outputCommitters = vertex.getOutputCommitters();
+        if (outputCommitters == null || outputCommitters.isEmpty()) {
+          LOG.info("No output committers for vertex: " + vertex.getName());
+          continue;
+        }
+        for (Map.Entry<String, OutputCommitter> entry : outputCommitters.entrySet())
{
+          LOG.info("Committing output: " + entry.getKey() + " for vertex: "
+              + vertex.getVertexId());
+          if (vertex.getState() != VertexState.SUCCEEDED) {
+            throw new TezUncheckedException("Vertex: " + vertex.getName() + 
+                " not in SUCCEEDED state. State= " + vertex.getState());
+          }
+          final OutputCommitter committer = entry.getValue();
+          try {
+            getDagUGI().doAs(new PrivilegedExceptionAction<Void>() {
+              @Override
+              public Void run() throws Exception {
+                committer.commitOutput();
+                committedOutputs.add(committer);
+                return null;
+              }
+            });
+          } catch (Exception e) {
+            failedWhileCommitting = true;
+            LOG.info("Exception in committing output: " + entry.getKey()
+                + " for vertex: " + vertex.getVertexId(), e);
+          }
+          if (failedWhileCommitting) {
+            break;
+          }
+        }
+        if (failedWhileCommitting) {
+          break;
+        }
+      }
+    }
+    
+    if (failedWhileCommitting) {
+      LOG.info("DAG: " + getID() + " failed while committing");
+    }
+        
+    if (!dagSucceeded || failedWhileCommitting) {
+      // come here because dag failed or
+      // dag succeeded and all or none semantics were on and a commit failed
+      for (Vertex vertex : vertices.values()) {
+        Map<String, OutputCommitter> outputCommitters = vertex
+            .getOutputCommitters();
+        if (outputCommitters == null || outputCommitters.isEmpty()) {
+          LOG.info("No output committers for vertex: " + vertex.getName());
+          continue;
+        }
+        for (Map.Entry<String, OutputCommitter> entry : outputCommitters
+            .entrySet()) {
+          final OutputCommitter committer = entry.getValue();
+          if (abortAllOutputsOnFailure // abort all outputs on failure
+              || vertex.getState() != VertexState.SUCCEEDED // always abort non-successful
outputs
+              ) {
+            LOG.info("Aborting output: " + entry.getKey() + " for vertex: "
+                + vertex.getVertexId());
+            try {
+              getDagUGI().doAs(new PrivilegedExceptionAction<Void>() {
+                @Override
+                public Void run() throws Exception {
+                  committer.abortOutput(VertexStatus.State.FAILED);
+                  return null;
+                }
+              });
+            } catch (Exception e) {
+              LOG.info("Exception in aborting output: " + entry.getKey()
+                  + " for vertex: " + vertex.getVertexId(), e);
+            }
+          }
+          // else successful outputs have already been committed
+        }
+      }
+    }
+    allOutputsCommitted = !failedWhileCommitting;
+    return allOutputsCommitted;
+  }
 
   @Override
   /**
@@ -695,7 +800,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       //Only succeed if vertices complete successfully and no terminationCause is registered.
       if(dag.numSuccessfulVertices == dag.numVertices && dag.terminationCause ==
null) {
         dag.setFinishTime();
-        dag.logJobHistoryFinishedEvent();
         return dag.finished(DAGState.SUCCEEDED);
       }
       else if(dag.terminationCause == DAGTerminationCause.DAG_KILL ){
@@ -705,7 +809,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
             " killedVertices:" + dag.numKilledVertices;
         LOG.info(diagnosticMsg);
         dag.addDiagnostic(diagnosticMsg);
-        dag.abortJob(DAGStatus.State.KILLED);
         return dag.finished(DAGState.KILLED);
       }
       if(dag.terminationCause == DAGTerminationCause.VERTEX_FAILURE ){
@@ -715,7 +818,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
             " killedVertices:" + dag.numKilledVertices;
         LOG.info(diagnosticMsg);
         dag.addDiagnostic(diagnosticMsg);
-        dag.abortJob(DAGStatus.State.FAILED);
         return dag.finished(DAGState.FAILED);
       }
       else {
@@ -734,16 +836,13 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     return dag.getInternalState();
   }
 
-  DAGState finished(DAGState finalState) {
+  private synchronized DAGState finished(DAGState finalState) {
     // TODO Metrics
     /*
     if (getInternalState() == DAGState.RUNNING) {
       metrics.endRunningJob(this);
     }
     */
-    if (finishTime == 0) setFinishTime();
-    eventHandler.handle(new DAGAppMasterEventDAGFinished(getID(), finalState));
-
     // TODO Metrics
     /*
     switch (finalState) {
@@ -757,8 +856,52 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         metrics.completedJob(this);
     }
     */
+
+    if (finishTime == 0) {
+      setFinishTime();
+    }
+    
+    boolean allOutputsCommitted = commitOrAbortOutputs(finalState == DAGState.SUCCEEDED);
+    
+    if (finalState == DAGState.SUCCEEDED && !allOutputsCommitted) {
+      finalState = DAGState.FAILED;
+      trySetTerminationCause(DAGTerminationCause.COMMIT_FAILURE);
+    }
+    
+    DAGStatus.State logState = getDAGStatusFromState(finalState);
+    if (logState == DAGStatus.State.SUCCEEDED) {
+      logJobHistoryFinishedEvent();
+    } else {
+      logJobHistoryUnsuccesfulEvent(logState);
+    }
+    
+    eventHandler.handle(new DAGAppMasterEventDAGFinished(getID(), finalState));
+
     return finalState;
   }
+  
+  private DAGStatus.State getDAGStatusFromState(DAGState finalState) {
+    switch (finalState) {
+      case NEW:
+        return DAGStatus.State.INITING;
+      case INITED:
+        return DAGStatus.State.INITING;
+      case RUNNING:
+        return DAGStatus.State.RUNNING;
+      case SUCCEEDED:
+        return DAGStatus.State.SUCCEEDED;
+      case FAILED:
+        return DAGStatus.State.FAILED;
+      case KILLED:
+        return DAGStatus.State.KILLED;
+      case ERROR:
+        return DAGStatus.State.ERROR;
+      case TERMINATING:
+        return DAGStatus.State.KILLED;
+      default:
+        throw new TezUncheckedException("Unknown DAGState: " + finalState);
+    }
+  }
 
   @Override
   public String getUserName() {
@@ -837,7 +980,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   }
   */
 
-  public static class InitTransition
+  private static class InitTransition
       implements MultipleArcTransition<DAGImpl, DAGEvent, DAGState> {
 
     /**
@@ -854,18 +997,18 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       //dag.metrics.preparingJob(dag);
 
       dag.initTime = dag.clock.getTime();
+      dag.abortAllOutputsOnFailure = dag.conf.getBoolean(
+          TezConfiguration.TEZ_AM_ABORT_ALL_OUTPUTS_ON_DAG_FAILURE,
+          TezConfiguration.TEZ_AM_ABORT_ALL_OUTPUTS_ON_DAG_FAILURE_DEFAULT);
 
       // If we have no vertices, fail the dag
       dag.numVertices = dag.getJobPlan().getVertexCount();
       if (dag.numVertices == 0) {
         dag.addDiagnostic("No vertices for dag");
         dag.trySetTerminationCause(DAGTerminationCause.ZERO_VERTICES);
-        dag.abortJob(DAGStatus.State.FAILED);
         return dag.finished(DAGState.FAILED);
       }
 
-      checkTaskLimits();
-
       // create the vertices
       for (int i=0; i < dag.numVertices; ++i) {
         String vertexName = dag.getJobPlan().getVertex(i).getName();
@@ -916,7 +1059,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
           vertexId, vertexPlan, vertexName, dag.conf,
           dag.eventHandler, dag.taskAttemptListener, 
           dag.clock, dag.taskHeartbeatHandler,
-          dag.appContext, vertexLocationHint);
+          !dag.abortAllOutputsOnFailure, dag.appContext, vertexLocationHint);
       return v;
     }
 
@@ -952,13 +1095,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       vertex.setOutputVertices(outVertices);
     }
 
-    /**
-     * If the number of tasks are greater than the configured value
-     * throw an exception that will fail job initialization
-     */
-    private void checkTaskLimits() {
-      // no code, for now
-    }
   } // end of InitTransition
 
   public static class StartTransition
@@ -979,11 +1115,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     }
   }
 
-  private void abortJob(DAGStatus.State abortState) {
-    // TODO: DAG Committer
-    logJobHistoryUnsuccesfulEvent(abortState);
-  }
-
   Map<String, Vertex> vertexMap = new HashMap<String, Vertex>();
   void addVertex(Vertex v) {
     vertices.put(v.getVertexId(), v);
@@ -1038,7 +1169,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     @Override
     public void transition(DAGImpl job, DAGEvent event) {
       job.setFinishTime();
-      job.logJobHistoryUnsuccesfulEvent(DAGStatus.State.KILLED);
       job.trySetTerminationCause(DAGTerminationCause.DAG_KILL);
       job.finished(DAGState.KILLED);
     }
@@ -1049,7 +1179,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     @Override
     public void transition(DAGImpl job, DAGEvent event) {
       job.trySetTerminationCause(DAGTerminationCause.DAG_KILL);
-      job.abortJob(DAGStatus.State.KILLED);
       job.addDiagnostic("Job received Kill in INITED state.");
       job.finished(DAGState.KILLED);
     }
@@ -1239,7 +1368,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       job.enactKill(DAGTerminationCause.INTERNAL_ERROR,
           VertexTerminationCause.INTERNAL_ERROR);
       job.setFinishTime();
-      job.logJobHistoryUnsuccesfulEvent(DAGStatus.State.FAILED);
       job.finished(DAGState.ERROR);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cdb62f90/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index d785c78..96a655e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -460,6 +460,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private TezVertexID originalOneToOneSplitSource = null;
 
   private AtomicBoolean committed = new AtomicBoolean(false);
+  private AtomicBoolean aborted = new AtomicBoolean(false);
+  private boolean commitVertexOutputs = false;
+  
   private VertexLocationHint vertexLocationHint;
   private Map<String, LocalResource> localResources;
   private Map<String, String> environment;
@@ -472,21 +475,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan,
       String vertexName, Configuration conf, EventHandler eventHandler,
       TaskAttemptListener taskAttemptListener, Clock clock,
-      // TODO: Recovery
-      //Map<TaskId, TaskInfo> completedTasksFromPreviousRun,
-      // TODO Metrics
-      //MRAppMetrics metrics,
-      TaskHeartbeatHandler thh,
+      TaskHeartbeatHandler thh, boolean commitVertexOutputs, 
       AppContext appContext, VertexLocationHint vertexLocationHint) {
     this.vertexId = vertexId;
     this.vertexPlan = vertexPlan;
     this.vertexName = StringInterner.weakIntern(vertexName);
     this.conf = conf;
-    //this.metrics = metrics;
     this.clock = clock;
-    // TODO: Recovery
-    //this.completedTasksFromPreviousRun = completedTasksFromPreviousRun;
     this.appContext = appContext;
+    this.commitVertexOutputs = commitVertexOutputs;
 
     this.taskAttemptListener = taskAttemptListener;
     this.taskHeartbeatHandler = thh;
@@ -1053,7 +1050,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       if(vertex.succeededTaskCount == vertex.tasks.size() && vertex.terminationCause
== null) {
         LOG.info("Vertex succeeded: " + vertex.logIdentifier);
         try {
-          if (!vertex.committed.getAndSet(true)) {
+          if (vertex.commitVertexOutputs && !vertex.committed.getAndSet(true)) {
             // commit only once
             LOG.info("Invoking committer commit for vertex, vertexId="
                 + vertex.logIdentifier);
@@ -1179,11 +1176,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   }
 
   private VertexState initializeVertex() {
-    // FIXME how do we decide vertex needs a committer?
-    // Answer: Do commit for every vertex
-    // for now, only for leaf vertices
-    // TODO TEZ-41 make commmitter type configurable per vertex
-
     if (!this.additionalOutputSpecs.isEmpty()) {
       try {
         LOG.info("Invoking committer inits for vertex, vertexId=" + logIdentifier);
@@ -1652,6 +1644,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   }
 
   private void abortVertex(final VertexStatus.State finalState) {
+    if (this.aborted.getAndSet(true)) {
+      LOG.info("Ignoring multiple aborts for vertex: " + logIdentifier);
+      return;
+    }
     LOG.info("Invoking committer abort for vertex, vertexId=" + logIdentifier);
     if (outputCommitters != null) {
       try {
@@ -1912,8 +1908,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
     @Override
     public VertexState transition(VertexImpl vertex, VertexEvent event) {
-      if (vertex.outputCommitters == null
-          || vertex.outputCommitters.isEmpty()) {
+      if (vertex.outputCommitters == null // no committer
+          || vertex.outputCommitters.isEmpty() // no committer
+          || !vertex.commitVertexOutputs) { // committer does not commit on vertex success
         LOG.info(vertex.getVertexId() + " back to running due to rescheduling "
             + ((VertexEventTaskReschedule)event).getTaskID());
         (new TaskRescheduledTransition()).transition(vertex, event);
@@ -2120,6 +2117,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
 
   }
+  
+  @Override
+  public Map<String, OutputCommitter> getOutputCommitters() {
+    return outputCommitters;
+  }
 
   @Private
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cdb62f90/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index c90f903..4c94d74 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -23,6 +23,10 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -35,6 +39,7 @@ import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataMovementType;
@@ -43,6 +48,7 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeSchedulingType;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
 import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
+import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
@@ -69,17 +75,21 @@ import org.apache.tez.dag.app.dag.event.VertexEvent;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
+import org.apache.tez.dag.app.dag.impl.TestVertexImpl.CountingOutputCommitter;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.avro.HistoryEventType;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.OutputCommitter;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import com.google.protobuf.ByteString;
+
 public class TestDAGImpl {
 
   private static final Log LOG = LogFactory.getLog(TestDAGImpl.class);
@@ -187,6 +197,14 @@ public class TestDAGImpl {
                     .setTaskModule("x1.y1")
                     .build()
                     )
+                .addOutputs(
+                    DAGProtos.RootInputLeafOutputProto.newBuilder()
+                    .setEntityDescriptor(
+                        TezEntityDescriptorProto.newBuilder().setClassName("output1").build()
+                    )
+                    .setName("output1")
+                    .setInitializerClassName(CountingOutputCommitter.class.getName())
+                 )
                     .addOutEdgeId("e1")
                     .build()
             )
@@ -202,14 +220,22 @@ public class TestDAGImpl {
                 )
                 .setTaskConfig(
                     PlanTaskConfiguration.newBuilder()
-                    .setNumTasks(2)
+                    .setNumTasks(1)
                     .setVirtualCores(4)
                     .setMemoryMb(1024)
                     .setJavaOpts("")
                     .setTaskModule("x2.y2")
                     .build()
                     )
-                    .addInEdgeId("e1")
+                .addOutputs(
+                    DAGProtos.RootInputLeafOutputProto.newBuilder()
+                    .setEntityDescriptor(
+                        TezEntityDescriptorProto.newBuilder().setClassName("output2").build()
+                    )
+                    .setName("output2")
+                    .setInitializerClassName(CountingOutputCommitter.class.getName())
+                 )
+                   .addInEdgeId("e1")
                     .addOutEdgeId("e2")
                     .build()
             )
@@ -226,15 +252,23 @@ public class TestDAGImpl {
                 )
                 .setTaskConfig(
                     PlanTaskConfiguration.newBuilder()
-                    .setNumTasks(2)
+                    .setNumTasks(1)
                     .setVirtualCores(4)
                     .setMemoryMb(1024)
                     .setJavaOpts("foo")
                     .setTaskModule("x3.y3")
                     .build()
                     )
-                    .addInEdgeId("e2")
-                    .build()
+               .addOutputs(
+                    DAGProtos.RootInputLeafOutputProto.newBuilder()
+                    .setEntityDescriptor(
+                        TezEntityDescriptorProto.newBuilder().setClassName("output3").build()
+                    )
+                    .setName("output3")
+                    .setInitializerClassName(CountingOutputCommitter.class.getName())
+               )
+               .addInEdgeId("e2")
+               .build()
             )
         .addEdge(
             EdgePlan.newBuilder()
@@ -511,6 +545,7 @@ public class TestDAGImpl {
         dispatcher.getEventHandler(),  taskAttemptListener,
         fsTokens, clock, "user", thh,
         mrrAppContext);
+    doReturn(conf).when(mrrAppContext).getAMConf();
     doReturn(mrrDag).when(mrrAppContext).getCurrentDAG();
     doReturn(appAttemptId).when(mrrAppContext).getApplicationAttemptId();
     taskEventDispatcher = new TaskEventDispatcher();
@@ -605,6 +640,217 @@ public class TestDAGImpl {
   
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
+  public void testDAGCompletionWithCommitSuccess() {
+    // all vertices completed -> DAG completion and commit
+    initDAG(mrrDag);
+    dispatcher.await();
+    startDAG(mrrDag);
+    dispatcher.await();
+    for (int i=0; i<2; ++i) {
+      Vertex v = mrrDag.getVertex("vertex"+(i+1));
+      dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
+          TezTaskID.getInstance(v.getVertexId(), 0), TaskState.SUCCEEDED));
+      dispatcher.await();
+      Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
+      Assert.assertEquals(i+1, mrrDag.getSuccessfulVertices());
+    }
+    
+    // no commit yet
+    for (Vertex v : mrrDag.vertices.values()) {
+      for (OutputCommitter c : v.getOutputCommitters().values()) {
+        CountingOutputCommitter committer= (CountingOutputCommitter) c;
+        Assert.assertEquals(0, committer.abortCounter);
+        Assert.assertEquals(0, committer.commitCounter);
+        Assert.assertEquals(1, committer.initCounter);
+        Assert.assertEquals(1, committer.setupCounter);
+      }
+    }
+    
+    // dag completion and commit
+    Vertex v = mrrDag.getVertex("vertex3");
+    dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
+        TezTaskID.getInstance(v.getVertexId(), 0), TaskState.SUCCEEDED));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
+    Assert.assertEquals(3, mrrDag.getSuccessfulVertices());
+    Assert.assertEquals(DAGState.SUCCEEDED, mrrDag.getState());
+    
+    for (Vertex vertex : mrrDag.vertices.values()) {
+      for (OutputCommitter c : vertex.getOutputCommitters().values()) {
+        CountingOutputCommitter committer= (CountingOutputCommitter) c;
+        Assert.assertEquals(0, committer.abortCounter);
+        Assert.assertEquals(1, committer.commitCounter);
+        Assert.assertEquals(1, committer.initCounter);
+        Assert.assertEquals(1, committer.setupCounter);
+      }
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  @Test(timeout=5000)
+  public void testDAGCompletionWithCommitFailure() throws IOException {
+    // all vertices completed -> DAG completion and commit
+    initDAG(mrrDag);
+    dispatcher.await();
+    // committer for bad vertex will throw exception
+    Vertex badVertex = mrrDag.getVertex("vertex3");
+    List<RootInputLeafOutputProto> outputs =
+        new ArrayList<RootInputLeafOutputProto>();
+    outputs.add(RootInputLeafOutputProto.newBuilder()
+        .setInitializerClassName(CountingOutputCommitter.class.getName())
+        .setName("output3")
+        .setEntityDescriptor(
+            TezEntityDescriptorProto.newBuilder()
+                .setUserPayload(ByteString.copyFrom(
+                    new CountingOutputCommitter.CountingOutputCommitterConfig(
+                        true, false, false).toUserPayload())).build())
+        .build());
+    badVertex.setAdditionalOutputs(outputs);
+    
+    startDAG(mrrDag);
+    dispatcher.await();
+    for (int i=0; i<2; ++i) {
+      Vertex v = mrrDag.getVertex("vertex"+(i+1));
+      dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
+          TezTaskID.getInstance(v.getVertexId(), 0), TaskState.SUCCEEDED));
+      dispatcher.await();
+      Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
+      Assert.assertEquals(i+1, mrrDag.getSuccessfulVertices());
+    }
+    
+    // no commit yet
+    for (Vertex v : mrrDag.vertices.values()) {
+      for (OutputCommitter c : v.getOutputCommitters().values()) {
+        CountingOutputCommitter committer= (CountingOutputCommitter) c;
+        Assert.assertEquals(0, committer.abortCounter);
+        Assert.assertEquals(0, committer.commitCounter);
+        Assert.assertEquals(1, committer.initCounter);
+        Assert.assertEquals(1, committer.setupCounter);
+      }
+    }
+    
+    // dag completion and commit. Exception causes all outputs to be aborted
+    Vertex v = mrrDag.getVertex("vertex3");
+    dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
+        TezTaskID.getInstance(v.getVertexId(), 0), TaskState.SUCCEEDED));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
+    Assert.assertEquals(3, mrrDag.getSuccessfulVertices());
+    Assert.assertEquals(DAGState.FAILED, mrrDag.getState());
+    Assert.assertEquals(DAGTerminationCause.COMMIT_FAILURE, mrrDag.getTerminationCause());
+    
+    for (Vertex vertex : mrrDag.vertices.values()) {
+      for (OutputCommitter c : vertex.getOutputCommitters().values()) {
+        CountingOutputCommitter committer= (CountingOutputCommitter) c;
+        Assert.assertEquals(1, committer.abortCounter);
+        Assert.assertEquals(1, committer.initCounter);
+        Assert.assertEquals(1, committer.setupCounter);
+      }
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  @Test(timeout=5000)
+  public void testDAGErrorAbortAllOutputs() {
+    // error on a vertex -> dag error -> all outputs aborted.
+    initDAG(mrrDag);
+    dispatcher.await();
+    startDAG(mrrDag);
+    dispatcher.await();
+    for (int i=0; i<2; ++i) {
+      Vertex v = mrrDag.getVertex("vertex"+(i+1));
+      dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
+          TezTaskID.getInstance(v.getVertexId(), 0), TaskState.SUCCEEDED));
+      dispatcher.await();
+      Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
+      Assert.assertEquals(i+1, mrrDag.getSuccessfulVertices());
+    }
+    
+    // no commit yet
+    for (Vertex v : mrrDag.vertices.values()) {
+      for (OutputCommitter c : v.getOutputCommitters().values()) {
+        CountingOutputCommitter committer= (CountingOutputCommitter) c;
+        Assert.assertEquals(0, committer.abortCounter);
+        Assert.assertEquals(0, committer.commitCounter);
+        Assert.assertEquals(1, committer.initCounter);
+        Assert.assertEquals(1, committer.setupCounter);
+      }
+    }
+    
+    // vertex error -> dag error -> abort all outputs
+    Vertex v = mrrDag.getVertex("vertex3");
+    dispatcher.getEventHandler().handle(new VertexEvent(
+        v.getVertexId(), VertexEventType.V_INTERNAL_ERROR));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.ERROR, v.getState());
+    Assert.assertEquals(DAGState.ERROR, mrrDag.getState());
+    
+    for (Vertex vertex : mrrDag.vertices.values()) {
+      for (OutputCommitter c : vertex.getOutputCommitters().values()) {
+        CountingOutputCommitter committer= (CountingOutputCommitter) c;
+        Assert.assertEquals(1, committer.abortCounter);
+        Assert.assertEquals(0, committer.commitCounter);
+        Assert.assertEquals(1, committer.initCounter);
+        Assert.assertEquals(1, committer.setupCounter);
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout=5000)
+  public void testDAGErrorAbortNonSuccessfulOutputs() {
+    // vertex success -> vertex output commit. failed dag aborts only non-successful vertices
+    conf.setBoolean(TezConfiguration.TEZ_AM_ABORT_ALL_OUTPUTS_ON_DAG_FAILURE, false);
+    initDAG(mrrDag);
+    dispatcher.await();
+    startDAG(mrrDag);
+    dispatcher.await();
+    for (int i=0; i<2; ++i) {
+      Vertex v = mrrDag.getVertex("vertex"+(i+1));
+      dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
+          TezTaskID.getInstance(v.getVertexId(), 0), TaskState.SUCCEEDED));
+      dispatcher.await();
+      Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
+      Assert.assertEquals(i+1, mrrDag.getSuccessfulVertices());
+      for (OutputCommitter c : v.getOutputCommitters().values()) {
+        CountingOutputCommitter committer= (CountingOutputCommitter) c;
+        Assert.assertEquals(0, committer.abortCounter);
+        Assert.assertEquals(1, committer.commitCounter);
+        Assert.assertEquals(1, committer.initCounter);
+        Assert.assertEquals(1, committer.setupCounter);
+      }
+    }
+    
+    // error on vertex -> dag error -> successful vertex output not aborted
+    Vertex errorVertex = mrrDag.getVertex("vertex3");
+    dispatcher.getEventHandler().handle(new VertexEvent(
+        errorVertex.getVertexId(), VertexEventType.V_INTERNAL_ERROR));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.ERROR, errorVertex.getState());
+    
+    dispatcher.await();
+    Assert.assertEquals(DAGState.ERROR, mrrDag.getState());
+    
+    for (Vertex vertex : mrrDag.vertices.values()) {
+      for (OutputCommitter c : vertex.getOutputCommitters().values()) {
+        CountingOutputCommitter committer= (CountingOutputCommitter) c;
+        if (vertex == errorVertex) {
+          Assert.assertEquals(1, committer.abortCounter);
+          Assert.assertEquals(0, committer.commitCounter);
+          Assert.assertEquals(1, committer.initCounter);
+          Assert.assertEquals(1, committer.setupCounter);
+        } else {
+          Assert.assertEquals(0, committer.abortCounter);
+          Assert.assertEquals(1, committer.commitCounter);
+          Assert.assertEquals(1, committer.initCounter);
+          Assert.assertEquals(1, committer.setupCounter);          
+        }
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test(timeout=5000)
   public void testVertexReRunning() {
     initDAG(dag);
     dag.dagScheduler = mock(DAGScheduler.class);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cdb62f90/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 694bb07..41296bb 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -1015,7 +1015,7 @@ public class TestVertexImpl {
       } else {
         v = new VertexImpl(vertexId, vPlan, vPlan.getName(), conf,
             dispatcher.getEventHandler(), taskAttemptListener,
-            clock, thh, appContext, locationHint);
+            clock, thh, true, appContext, locationHint);
       }
       vertices.put(vName, v);
       vertexIdMap.put(vertexId, v);
@@ -2005,7 +2005,7 @@ public class TestVertexImpl {
       VertexPlan vPlan = invalidDagPlan.getVertex(0);
       VertexImpl v = new VertexImpl(vId, vPlan, vPlan.getName(), conf,
           dispatcher.getEventHandler(), taskAttemptListener,
-          clock, thh, appContext, vertexLocationHint);
+          clock, thh, true, appContext, vertexLocationHint);
       vertexIdMap.put(vId, v);
       v.handle(new VertexEvent(vId, VertexEventType.V_INIT));
       dispatcher.await();
@@ -2032,8 +2032,8 @@ public class TestVertexImpl {
         Clock clock, TaskHeartbeatHandler thh,
         AppContext appContext, VertexLocationHint vertexLocationHint, DrainDispatcher dispatcher)
{
       super(vertexId, vertexPlan, vertexName, conf, eventHandler,
-          taskAttemptListener, clock, thh, appContext,
-          vertexLocationHint);
+          taskAttemptListener, clock, thh, true,
+          appContext, vertexLocationHint);
       this.dispatcher = dispatcher;
     }
 


Mime
View raw message