tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject tez git commit: TEZ-3271. Provide mapreduce failures.maxpercent equivalent. (jeagles)
Date Mon, 05 Dec 2016 19:21:03 GMT
Repository: tez
Updated Branches:
  refs/heads/master 43ca78fea -> a33d2213b


TEZ-3271. Provide mapreduce failures.maxpercent equivalent. (jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a33d2213
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a33d2213
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a33d2213

Branch: refs/heads/master
Commit: a33d2213b54cd3dfddbff4155d047674a4416f35
Parents: 43ca78f
Author: Jonathan Eagles <jeagles@yahoo-inc.com>
Authored: Mon Dec 5 13:18:33 2016 -0600
Committer: Jonathan Eagles <jeagles@yahoo-inc.com>
Committed: Mon Dec 5 13:18:33 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/tez/dag/api/TezConfiguration.java    |  10 +-
 .../org/apache/tez/dag/app/dag/impl/Edge.java   |  28 +++++
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  70 +++++++++--
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 116 ++++++++++++++++++-
 .../java/org/apache/tez/test/TestTezJobs.java   |  50 ++++++++
 6 files changed, 262 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/a33d2213/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d9a7ca6..6bbbfb2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3271. Provide mapreduce failures.maxpercent equivalent.
   TEZ-3222. Reduce messaging overhead for auto-reduce parallelism case.
   TEZ-3547. Add TaskAssignment Analyzer.
   TEZ-3508. TestTaskScheduler cleanup.

http://git-wip-us.apache.org/repos/asf/tez/blob/a33d2213/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index a09e888..e27cdf8 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -994,7 +994,15 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_AM_PREEMPTION_PERCENTAGE =
       TEZ_AM_PREFIX + "preemption.percentage";
   public static final int TEZ_AM_PREEMPTION_PERCENTAGE_DEFAULT = 10;
-  
+
+  /**
+   * Float value. Specifies the allowable percentage in the range 0.0-100.0f of task
+   * failures per vertex that will allow the vertex to succeed with failures.
+   */
+  @ConfigurationScope(Scope.VERTEX)
+  public static final String TEZ_VERTEX_FAILURES_MAXPERCENT =
+          "tez.vertex.failures.maxpercent";
+  public static final float TEZ_VERTEX_FAILURES_MAXPERCENT_DEFAULT = 0.0f;
   /**
    * Int value. The number of RM heartbeats to wait after preempting running tasks before
preempting
    * more running tasks. After preempting a task, we need to wait at least 1 heartbeat so
that the 

http://git-wip-us.apache.org/repos/asf/tez/blob/a33d2213/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index 9640f06..690df63 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -23,9 +23,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.zip.Deflater;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -68,6 +71,31 @@ public class Edge {
 
   private static final Logger LOG = LoggerFactory.getLogger(Edge.class);
 
+  public List<TezEvent> generateEmptyEventsForAttempt(TezTaskAttemptID attempt) throws
Exception {
+
+    if (!edgeProperty.getEdgeSource().getClassName().startsWith("org.apache.tez")) {
+      throw new TezException("Only org.apache.tez outputs are allowed for max percent failure
feature. Disallowed Output: "
+          + edgeProperty.getEdgeSource().getClassName());
+    }
+    List<Event> events = new ArrayList<>();
+    Deflater deflater = TezCommonUtils.newBestCompressionDeflater();
+    try {
+      ShuffleUtils.generateEventsForNonStartedOutput(events,
+          edgeManager.getNumDestinationConsumerTasks(attempt.getTaskID().getId()), null,
false, true, deflater);
+    } catch (Exception e) {
+      throw new TezException(e);
+    }
+    EventMetaData sourceInfo = new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT,
+        sourceVertex.getName(), getDestinationVertexName(), attempt);
+
+    List<TezEvent> tezEvents = new ArrayList<>(events.size());
+    for (Event e : events) {
+      TezEvent tezEvent = new TezEvent(e, sourceInfo);
+      tezEvents.add(tezEvent);
+    }
+    return tezEvents;
+  }
+
   class EdgeManagerPluginContextImpl implements EdgeManagerPluginContext {
 
     private final UserPayload userPayload;

http://git-wip-us.apache.org/repos/asf/tez/blob/a33d2213/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 3f6debf..bf291b7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -255,6 +255,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
   final ServicePluginInfo servicePluginInfo;
 
 
+  private final float maxFailuresPercent;
+  private boolean logSuccessDiagnostics = false;
+
   //fields initialized in init
 
   @VisibleForTesting
@@ -960,7 +963,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
     if (isSpeculationEnabled()) {
       speculator = new LegacySpeculator(vertexConf, getAppContext(), this);
     }
-    
+
+    maxFailuresPercent = vertexConf.getFloat(TezConfiguration.TEZ_VERTEX_FAILURES_MAXPERCENT,
+            TezConfiguration.TEZ_VERTEX_FAILURES_MAXPERCENT_DEFAULT);
 
     // This "this leak" is okay because the retained pointer is in an
     //  instance variable.
@@ -1987,7 +1992,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
   void logJobHistoryVertexFinishedEvent() throws IOException {
     if (recoveryData == null
         || !recoveryData.isVertexSucceeded()) {
-      logJobHistoryVertexCompletedHelper(VertexState.SUCCEEDED, finishTime, "",
+      logJobHistoryVertexCompletedHelper(VertexState.SUCCEEDED, finishTime,
+          logSuccessDiagnostics ? StringUtils.join(getDiagnostics(), LINE_SEPARATOR) : "",
           getAllCounters());
     }
   }
@@ -2121,10 +2127,52 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
     if (vertex.completedTaskCount == vertex.tasks.size()) {
       // finished - gather stats
       vertex.finalStatistics = vertex.constructStatistics();
-      
-      //Only succeed if tasks complete successfully and no terminationCause is registered.
-      if(vertex.succeededTaskCount == vertex.tasks.size() && vertex.terminationCause
== null) {
-        LOG.info("All tasks have succeeded, vertex:" + vertex.logIdentifier);
+
+      //Only succeed if tasks complete successfully and no terminationCause is registered
or if failures are below configured threshold.
+      boolean vertexSucceeded = vertex.succeededTaskCount == vertex.numTasks;
+      boolean vertexFailuresBelowThreshold = (vertex.succeededTaskCount + vertex.failedTaskCount
== vertex.numTasks)
+          && (vertex.failedTaskCount * 100 <= vertex.maxFailuresPercent * vertex.numTasks);
+
+      if((vertexSucceeded || vertexFailuresBelowThreshold) && vertex.terminationCause
== null) {
+        if(vertexSucceeded) {
+          LOG.info("All tasks have succeeded, vertex:" + vertex.logIdentifier);
+        } else {
+          LOG.info("All tasks in the vertex " + vertex.logIdentifier + " have completed and
the percentage of failed tasks (failed/total) (" + vertex.failedTaskCount + "/" + vertex.numTasks
+ ") is less that the threshold of " + vertex.maxFailuresPercent);
+          vertex.addDiagnostic("Vertex succeeded as percentage of failed tasks (failed/total)
(" + vertex.failedTaskCount + "/" + vertex.numTasks + ") is less that the threshold of " +
vertex.maxFailuresPercent);
+          vertex.logSuccessDiagnostics = true;
+          for (Task task : vertex.tasks.values()) {
+            if (!task.getState().equals(TaskState.FAILED)) {
+              continue;
+            }
+            // Find the last attempt and mark that as successful
+            Iterator<TezTaskAttemptID> attempts = task.getAttempts().keySet().iterator();
+            TezTaskAttemptID lastAttempt = null;
+            while (attempts.hasNext()) {
+              TezTaskAttemptID attempt = attempts.next();
+              if (lastAttempt == null || attempt.getId() > lastAttempt.getId()) {
+                lastAttempt = attempt;
+              }
+            }
+            LOG.info("Succeeding failed task attempt:" + lastAttempt);
+            for (Map.Entry<Vertex, Edge> vertexEdge : vertex.targetVertices.entrySet())
{
+              Vertex destVertex = vertexEdge.getKey();
+              Edge edge = vertexEdge.getValue();
+              try {
+                List<TezEvent> tezEvents = edge.generateEmptyEventsForAttempt(lastAttempt);
+
+                // Downstream vertices need to receive a SUCCEEDED completion event for each
failed task to ensure num bipartite count is correct
+                VertexEventTaskAttemptCompleted completionEvent = new VertexEventTaskAttemptCompleted(lastAttempt,
TaskAttemptStateInternal.SUCCEEDED);
+
+                // Notify all target vertices
+                vertex.eventHandler.handle(new VertexEventSourceTaskAttemptCompleted(destVertex.getVertexId(),
completionEvent));
+                vertex.eventHandler.handle(new VertexEventRouteEvent(destVertex.getVertexId(),
tezEvents));
+              } catch (Exception e) {
+                throw new TezUncheckedException(e);
+              }
+            }
+          }
+        }
+
         if (vertex.commitVertexOutputs && !vertex.committed.getAndSet(true)) {
           // start commit if there're commits or just finish if no commits
           return commitOrFinish(vertex);
@@ -3440,11 +3488,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
EventHandl
           vertex.completedTasksStatsCache.mergeFrom(((TaskImpl) task).getStatistics());
         }
       } else if (taskEvent.getState() == TaskState.FAILED) {
-        LOG.info("Failing vertex: " + vertex.logIdentifier +
-            " because task failed: " + taskEvent.getTaskID());
-        vertex.tryEnactKill(VertexTerminationCause.OWN_TASK_FAILURE, TaskTerminationCause.OTHER_TASK_FAILURE);
-        forceTransitionToKillWait = true;
         taskFailed(vertex, task);
+        if (vertex.failedTaskCount * 100 > vertex.maxFailuresPercent * vertex.numTasks)
{
+          LOG.info("Failing vertex: " + vertex.logIdentifier +
+                  " because task failed: " + taskEvent.getTaskID());
+          vertex.tryEnactKill(VertexTerminationCause.OWN_TASK_FAILURE, TaskTerminationCause.OTHER_TASK_FAILURE);
+          forceTransitionToKillWait = true;
+        }
       } else if (taskEvent.getState() == TaskState.KILLED) {
         taskKilled(vertex, task);
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/a33d2213/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index b7e63f6..a11311d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -56,11 +56,14 @@ import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.counters.Limits;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.app.dag.event.TaskEventTAFailed;
 import org.apache.tez.dag.app.dag.event.TaskEventTALaunched;
 import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded;
 import org.apache.tez.hadoop.shim.DefaultHadoopShim;
 import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.VertexStatistics;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
 import org.apache.tez.test.GraceShuffleVertexManagerForTest;
@@ -1823,7 +1826,7 @@ public class TestVertexImpl {
                 EdgePlan.newBuilder()
                     .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("i6_v4"))
                     .setInputVertexName("vertex4")
-                    .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o4"))
+                    .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("org.apache.tez.o4"))
                     .setOutputVertexName("vertex6")
                     .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
                     .setId("e5")
@@ -1835,7 +1838,7 @@ public class TestVertexImpl {
                 EdgePlan.newBuilder()
                     .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("i6_v5"))
                     .setInputVertexName("vertex5")
-                    .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o5"))
+                    .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("org.apache.tez.o5"))
                     .setOutputVertexName("vertex6")
                     .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
                     .setId("e6")
@@ -3740,10 +3743,119 @@ public class TestVertexImpl {
 
     Assert.assertEquals(VertexState.SUCCEEDED, v4.getState());
     Assert.assertEquals(VertexState.SUCCEEDED, v5.getState());
+    Assert.assertEquals(VertexState.RUNNING, v6.getState());
+    Assert.assertEquals(4, v6.numSuccessSourceAttemptCompletions);
+
+  }
+
+  @Test(timeout = 5000)
+  public void testFailuresMaxPercentSourceTaskAttemptCompletionEvents() throws TezException
{
+    LOG.info("Testing testFailuresMaxPercentSourceTaskAttemptCompletionEvents");
 
+    // Override the basic setup for this test to inject the specific config setting needed
for this test
+    useCustomInitializer = false;
+    customInitializer = null;
+    setupPreDagCreation();
+    conf.setFloat(TezConfiguration.TEZ_VERTEX_FAILURES_MAXPERCENT, 50.0f);
+    conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1);
+    dagPlan = createTestDAGPlan();
+    setupPostDagCreation();
+    initAllVertices(VertexState.INITED);
+
+    VertexImpl v4 = vertices.get("vertex4");
+    VertexImpl v5 = vertices.get("vertex5");
+    VertexImpl v6 = vertices.get("vertex6");
+
+    startVertex(vertices.get("vertex1"));
+    startVertex(vertices.get("vertex2"));
+    dispatcher.await();
+    LOG.info("Verifying v6 state " + v6.getState());
+    Assert.assertEquals(VertexState.RUNNING, v6.getState());
+
+    TezTaskID t1_v4 = TezTaskID.getInstance(v4.getVertexId(), 0);
+    TezTaskID t2_v4 = TezTaskID.getInstance(v4.getVertexId(), 1);
+    TezTaskID t1_v5 = TezTaskID.getInstance(v5.getVertexId(), 0);
+    TezTaskID t2_v5 = TezTaskID.getInstance(v5.getVertexId(), 1);
+
+    TezTaskAttemptID ta1_t1_v4 = TezTaskAttemptID.getInstance(t1_v4, 0);
+    TezTaskAttemptID ta1_t2_v4 = TezTaskAttemptID.getInstance(t2_v4, 0);
+    TezTaskAttemptID ta1_t1_v5 = TezTaskAttemptID.getInstance(t1_v5, 0);
+    TezTaskAttemptID ta1_t2_v5 = TezTaskAttemptID.getInstance(t2_v5, 0);
+
+    TaskSpec taskSpec = new TaskSpec("dag", "vertex", 2, new ProcessorDescriptor(), new ArrayList<InputSpec>(),
+        new ArrayList<OutputSpec>(), null, conf);
+    TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(null, null);
+
+    // Tasks can only succeed from a scheduled or running state
+    dispatcher.getEventHandler().handle(new TaskEventScheduleTask(t1_v4, taskSpec, locationHint,
false));
+    dispatcher.getEventHandler().handle(new TaskEventScheduleTask(t2_v4, taskSpec, locationHint,
false));
+
+    // Completed tasks are less that the max percent failure
+    dispatcher.getEventHandler().handle(new TaskEventTAFailed(ta1_t1_v4, TaskFailureType.NON_FATAL,
null));
+    dispatcher.getEventHandler().handle(new TaskEventTASucceeded(ta1_t2_v4));
+    dispatcher.getEventHandler().handle(new TaskEventTASucceeded(ta1_t1_v5));
+    dispatcher.getEventHandler().handle(new TaskEventTAFailed(ta1_t2_v5, TaskFailureType.NON_FATAL,
null));
+    dispatcher.await();
+
+    Assert.assertEquals(VertexState.SUCCEEDED, v4.getState());
+    Assert.assertEquals(VertexState.SUCCEEDED, v5.getState());
     Assert.assertEquals(VertexState.RUNNING, v6.getState());
     Assert.assertEquals(4, v6.numSuccessSourceAttemptCompletions);
+  }
 
+  @Test(timeout = 5000)
+  public void testFailuresMaxPercentExceededSourceTaskAttemptCompletionEvents() throws TezException
{
+    LOG.info("Testing testFailuresMaxPercentSourceTaskAttemptCompletionEvents");
+
+    // Override the basic setup for this test to inject the specific config setting needed
for this test
+    useCustomInitializer = false;
+    customInitializer = null;
+    setupPreDagCreation();
+    conf.setFloat(TezConfiguration.TEZ_VERTEX_FAILURES_MAXPERCENT, 50.0f);
+    conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1);
+    dagPlan = createTestDAGPlan();
+    setupPostDagCreation();
+    initAllVertices(VertexState.INITED);
+
+    VertexImpl v4 = vertices.get("vertex4");
+    VertexImpl v5 = vertices.get("vertex5");
+    VertexImpl v6 = vertices.get("vertex6");
+
+    startVertex(vertices.get("vertex1"));
+    startVertex(vertices.get("vertex2"));
+    dispatcher.await();
+    LOG.info("Verifying v6 state " + v6.getState());
+    Assert.assertEquals(VertexState.RUNNING, v6.getState());
+
+    TezTaskID t1_v4 = TezTaskID.getInstance(v4.getVertexId(), 0);
+    TezTaskID t2_v4 = TezTaskID.getInstance(v4.getVertexId(), 1);
+    TezTaskID t1_v5 = TezTaskID.getInstance(v5.getVertexId(), 0);
+    TezTaskID t2_v5 = TezTaskID.getInstance(v5.getVertexId(), 1);
+
+    TezTaskAttemptID ta1_t1_v4 = TezTaskAttemptID.getInstance(t1_v4, 0);
+    TezTaskAttemptID ta1_t2_v4 = TezTaskAttemptID.getInstance(t2_v4, 0);
+    TezTaskAttemptID ta1_t1_v5 = TezTaskAttemptID.getInstance(t1_v5, 0);
+    TezTaskAttemptID ta1_t2_v5 = TezTaskAttemptID.getInstance(t2_v5, 0);
+
+    TaskSpec taskSpec = new TaskSpec("dag", "vertex", 2, new ProcessorDescriptor(), new ArrayList<InputSpec>(),
+        new ArrayList<OutputSpec>(), null, conf);
+    TaskLocationHint locationHint = TaskLocationHint.createTaskLocationHint(null, null);
+
+    // Tasks can only succeed from a scheduled or running state
+    dispatcher.getEventHandler().handle(new TaskEventScheduleTask(t1_v4, taskSpec, locationHint,
false));
+    dispatcher.getEventHandler().handle(new TaskEventScheduleTask(t2_v4, taskSpec, locationHint,
false));
+
+    // Completed tasks are more that the max percent failure
+    dispatcher.getEventHandler().handle(new TaskEventTAFailed(ta1_t1_v4, TaskFailureType.NON_FATAL,
null));
+    dispatcher.getEventHandler().handle(new TaskEventTAFailed(ta1_t2_v4, TaskFailureType.NON_FATAL,
null));
+    dispatcher.getEventHandler().handle(new TaskEventTASucceeded(ta1_t1_v5));
+    dispatcher.getEventHandler().handle(new TaskEventTAFailed(ta1_t2_v5, TaskFailureType.NON_FATAL,
null));
+    dispatcher.await();
+
+    Assert.assertEquals(VertexState.FAILED, v4.getState());
+    Assert.assertEquals(VertexState.SUCCEEDED, v5.getState());
+    Assert.assertEquals(VertexState.RUNNING, v6.getState());
+    Assert.assertEquals(2, v6.numSuccessSourceAttemptCompletions);
   }
 
   @Test(timeout = 5000)

http://git-wip-us.apache.org/repos/asf/tez/blob/a33d2213/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index 241c6e9..479509d 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -43,14 +43,19 @@ import java.util.concurrent.locks.ReentrantLock;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.tez.common.counters.CounterGroup;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.dag.api.client.VertexStatus;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -1033,6 +1038,21 @@ public class TestTezJobs {
     }
   }
 
+  public static class FailingAttemptProcessor extends SimpleProcessor {
+
+    public FailingAttemptProcessor(ProcessorContext context) {
+      super(context);
+    }
+
+    @Override
+    public void run() throws Exception {
+      if (getContext().getTaskIndex() == 0) {
+        LOG.info("Failing task " + getContext().getTaskIndex() + ", attempt " + getContext().getTaskAttemptNumber());
+        throw new IOException("Failing task " + getContext().getTaskIndex() + ", attempt
" + getContext().getTaskAttemptNumber());
+      }
+    }
+  }
+
   public static class InputInitializerForTest extends InputInitializer {
 
     private final ReentrantLock lock = new ReentrantLock();
@@ -1281,5 +1301,35 @@ public class TestTezJobs {
     }
   }
 
+  @Test(timeout = 60000)
+  public void testVertexFailuresMaxPercent() throws TezException, InterruptedException, IOException
{
+
+    TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+    tezConf.set(TezConfiguration.TEZ_VERTEX_FAILURES_MAXPERCENT, "50.0f");
+    tezConf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1);
+    TezClient tezClient = TezClient.create("TestVertexFailuresMaxPercent", tezConf);
+    tezClient.start();
+
+    try {
+      DAG dag = DAG.create("TestVertexFailuresMaxPercent");
+      Vertex vertex1 = Vertex.create("Parent", ProcessorDescriptor.create(
+          FailingAttemptProcessor.class.getName()), 2);
+      Vertex vertex2 = Vertex.create("Child", ProcessorDescriptor.create(FailingAttemptProcessor.class.getName()),
2);
+
+      OrderedPartitionedKVEdgeConfig edgeConfig = OrderedPartitionedKVEdgeConfig
+          .newBuilder(Text.class.getName(), IntWritable.class.getName(),
+              HashPartitioner.class.getName())
+          .setFromConfiguration(tezConf)
+          .build();
+      dag.addVertex(vertex1)
+          .addVertex(vertex2)
+          .addEdge(Edge.create(vertex1, vertex2, edgeConfig.createDefaultEdgeProperty()));
 
+      DAGClient dagClient = tezClient.submitDAG(dag);
+      dagClient.waitForCompletion();
+      Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+    } finally {
+      tezClient.stop();
+    }
+  }
 }


Mime
View raw message