tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-842. Add built-in verification of expected execution pattern into TestFaultTolerance (Tassapol Athiapinya bikas)
Date Fri, 21 Feb 2014 00:17:54 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master 7e1d11010 -> 4f98537d6


TEZ-842. Add built-in verification of expected execution pattern into TestFaultTolerance (Tassapol
Athiapinya bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/4f98537d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/4f98537d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/4f98537d

Branch: refs/heads/master
Commit: 4f98537d6c0b17a6b30b7c74b57b01f65f208dcc
Parents: 7e1d110
Author: Bikas Saha <bikas@apache.org>
Authored: Thu Feb 20 16:17:49 2014 -0800
Committer: Bikas Saha <bikas@apache.org>
Committed: Thu Feb 20 16:17:49 2014 -0800

----------------------------------------------------------------------
 .../org/apache/tez/test/TestFaultTolerance.java | 149 ++++++++++++++++++-
 .../java/org/apache/tez/test/TestInput.java     |  22 ++-
 .../java/org/apache/tez/test/TestOutput.java    |  11 +-
 .../java/org/apache/tez/test/TestProcessor.java |  89 ++++++++++-
 4 files changed, 261 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4f98537d/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
index b45aec4..bcf4a46 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
@@ -175,6 +175,17 @@ public class TestFaultTolerance {
     testConf.setInt(TestProcessor.getVertexConfName(
         TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), 0);
     
+    //verify value at v2 task1
+    testConf.set(TestProcessor.getVertexConfName(
+        TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v2"), "1");
+    //value of v2 task1 is 4.
+    //v1 attempt0 has value of 1 (attempt index + 1). 
+    //v1 attempt1 has value of 2 (attempt index + 1).
+    //v3 attempt0 verifies value of 1 + 2 (values from input vertices) 
+    // + 1 (attempt index + 1) = 4
+    testConf.setInt(TestProcessor.getVertexConfName(
+        TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v2", 1), 4);
+
     DAG dag = SimpleTestDAG.createDAG("testBasicTaskFailure", testConf);
     runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
   }
@@ -189,6 +200,13 @@ public class TestFaultTolerance {
     testConf.setInt(TestProcessor.getVertexConfName(
         TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v1"), 1);
     
+    //v1 task0,1 attempt 2 succeed. Input sum = 6. Plus one (v2 attempt0).
+    //ending sum is 7.
+    testConf.set(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v2"), "0");
+    testConf.setInt(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v2", 0), 7);
+    
     DAG dag = SimpleTestDAG.createDAG("testTaskMultipleFailures", testConf);
     runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
   }
@@ -221,6 +239,21 @@ public class TestFaultTolerance {
     testConf.set(TestInput.getVertexConfName(
         TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
     
+    //v2 task1 attempt0 index0 fails and exits.
+    //v1 task0 attempt1 reruns. 
+    //v2 task1 attempt1 has:
+    // v1 task0 attempt1 (value = 2) + v1 task1 attempt0 (value = 1)
+    // + its own value, attempt + 1 (value = 2). Total is 5.
+    testConf.set(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v2"), "1");
+    testConf.setInt(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v2", 1), 5);
+    //v2 task0 attempt 0 succeeds instantly.
+    testConf.set(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v2"), "0");
+    testConf.setInt(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v2", 0), 3);
+    
     DAG dag = SimpleTestDAG.createDAG("testBasicInputFailureWithExit", testConf);
     runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
   }
@@ -237,6 +270,14 @@ public class TestFaultTolerance {
     testConf.set(TestInput.getVertexConfName(
         TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "0");
     
+    testConf.set(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v2"), "1");
+    testConf.setInt(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v2", 1), 4);
+    testConf.set(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v2"), "0");
+    testConf.setInt(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v2", 0), 3);
     DAG dag = SimpleTestDAG.createDAG("testBasicInputFailureWithoutExit", testConf);
     runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
   }
@@ -253,6 +294,19 @@ public class TestFaultTolerance {
     testConf.set(TestInput.getVertexConfName(
         TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v2"), "-1");
     
+    //v2 task0 attempt0 input0,1 fails. wait.
+    //v1 task0 attempt1 reruns. v1 task1 attempt1 reruns.
+    //2 + 2 + 1 = 5
+    //same number for v2 task1
+    testConf.set(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v2"), "0");
+    testConf.setInt(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v2", 0), 5);
+    testConf.set(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v2"), "1");
+    testConf.setInt(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v2", 1), 5);
+    
     DAG dag = SimpleTestDAG.createDAG("testMultipleInputFailureWithoutExit", testConf);
     runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
   }
@@ -271,6 +325,19 @@ public class TestFaultTolerance {
     testConf.setInt(TestInput.getVertexConfName(
         TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v2"), 1);
     
+    //v2 task1 attempt0 input0 input-attempt0 fails. Wait. v1 task0 attempt1 reruns.
+    //v2 task1 attempt0 input0 input-attempt1 fails. Wait. v1 task0 attempt2 reruns.
+    //v2 task1 attempt0 input0 input-attempt2 succeeds.
+    //input values (3 + 1) + 1 = 5 
+    testConf.set(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v2"), "1");
+    testConf.setInt(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v2", 1), 5);
+    testConf.set(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v2"), "0");
+    testConf.setInt(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v2", 0), 3);
+    
     DAG dag = SimpleTestDAG.createDAG("testMultiVersionInputFailureWithoutExit", testConf);
     runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
   }
@@ -318,6 +385,18 @@ public class TestFaultTolerance {
     //3 attempts fail
     testConf.setInt(TestProcessor.getVertexConfName(
             TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "l3v1"), 2);
+    
+    //l2v1: task0 attempt0 succeeds. task1 attempt3 succeeds. 
+    //l3v1 finally task0 attempt3 will succeed.
+    //l1v1 outputs 1. l1v2 outputs 2.
+    //l2v1 task0 attempt0 output = 2. 
+    //l2v2 output: attempt0 (l1v2+self = 2+1) * 3 tasks = 9
+    //l3v1 task0 attempt3 = l2v1 (2) + l2v2 (9) + self (4) = 15
+    testConf.set(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "l3v1"), "0");
+    testConf.setInt(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "l3v1", 0), 15);
+    
     DAG dag = ThreeLevelsFailingDAG.createDAG("testThreeLevelsFailingDAG2VerticesHaveFailedAttemptsDAGSucceeds",
testConf);
     runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
   }
@@ -334,7 +413,7 @@ public class TestFaultTolerance {
    * This will trigger rerun of v1-task1.
    * v1-task1-attempt2 is re-run and succeeds.
    * v2-task0-attempt0 (no attempt bump) runs. Check its input1. 
-   * The input version is now 1. The attempt will now succeed.
+   * The input version is now 2. The attempt will now succeed.
    * @throws Exception
    */
   @Test (timeout=60000)
@@ -363,6 +442,19 @@ public class TestFaultTolerance {
     //at v2-task0-attempt0/1-input1 has input failure at input version 0 only.
     testConf.setInt(TestInput.getVertexConfName(
         TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v2"), 1);
+    
+    //v2-task1-attempt0 takes v1-task0-attempt0 input and v1-task1-attempt1 input.
+    //v2-task1 does not take v1-task1-attempt2 (re-run caused by input failure 
+    //triggered by v2-task0) output.
+    //1 + 2 + 1 = 4
+    testConf.set(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v2"), "0");
+    testConf.setInt(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v2", 0), 5);
+    testConf.set(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v2"), "1");
+    testConf.setInt(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v2", 1), 4);
   
     DAG dag = SimpleTestDAG.createDAG(
             "testInputFailureCausesRerunAttemptWithinMaxAttemptSuccess", testConf);
@@ -425,6 +517,18 @@ public class TestFaultTolerance {
   public void testCascadingInputFailureWithoutExitSuccess() throws Exception {
     Configuration testConf = new Configuration(false);
     setCascadingInputFailureConfig(testConf, false);
+    
+    //v2 task0 attempt1 value = v1 task0 attempt1 (2) + v1 task1 attempt0 (1) + 2 = 5
+    //v3 all-tasks attempt0 takes v2 task0 attempt1 value (5) + v2 task1 attempt0 (3) + 1
= 9
+    testConf.set(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v3"), "0");
+    testConf.setInt(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v3", 0), 9);
+    testConf.set(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v3"), "1");
+    testConf.setInt(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v3", 1), 9);
+    
     DAG dag = SimpleTestDAG3Vertices.createDAG(
               "testCascadingInputFailureWithoutExitSuccess", testConf);
     runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
@@ -445,6 +549,18 @@ public class TestFaultTolerance {
   public void testCascadingInputFailureWithExitSuccess() throws Exception {
     Configuration testConf = new Configuration(false);
     setCascadingInputFailureConfig(testConf, true);
+    
+    //v2 task0 attempt2 value = v1 task0 attempt1 (2) + v1 task1 attempt0 (1) + 3 = 6
+    //v3 all-tasks attempt1 takes v2 task0 attempt2 value (6) + v2 task1 attempt0 (3) + 2
= 11
+    testConf.set(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v3"), "0");
+    testConf.setInt(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v3", 0), 11);
+    testConf.set(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v3"), "1");
+    testConf.setInt(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v3", 1), 11);
+    
     DAG dag = SimpleTestDAG3Vertices.createDAG(
               "testCascadingInputFailureWithExitSuccess", testConf);
     runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
@@ -474,6 +590,14 @@ public class TestFaultTolerance {
     testConf.set(TestInput.getVertexConfName(
         TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v3"), "1");
     
+    //v3 attempt0:
+    //v1 task0,1 attempt2 = 6. v2 task0,1 attempt2 = 6.
+    //total = 6 + 6 + 1 = 13
+    testConf.set(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v3"), "0");
+    testConf.setInt(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v3", 0), 13);
+    
     DAG dag = SimpleVTestDAG.createDAG(
             "testInputFailureCausesRerunOfTwoVerticesWithoutExit", testConf);
     runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
@@ -499,6 +623,17 @@ public class TestFaultTolerance {
     testConf.setInt(TestProcessor.getVertexConfName(
         TestProcessor.TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, "v3"), 1);
     
+    //v1 input = 2. v2 input = 2
+    //v3 attempt2 value = 2 + 2 + 3 = 7
+    testConf.set(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v3"), "0");
+    testConf.setInt(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v3", 0), 7);
+    testConf.set(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v3"), "1");
+    testConf.setInt(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v3", 1), 7);
+    
     DAG dag = SimpleVTestDAG.createDAG(
             "testAttemptOfDownstreamVertexConnectedWithTwoUpstreamVerticesFailure", testConf);
     runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
@@ -542,6 +677,18 @@ public class TestFaultTolerance {
         TestInput.TEZ_FAILING_INPUT_FAILING_INPUT_INDEX, "v3"), "-1");
     testConf.set(TestInput.getVertexConfName(
         TestInput.TEZ_FAILING_INPUT_FAILING_UPTO_INPUT_ATTEMPT, "v3"), "0");
+    
+    //both vertices trigger v1 rerun. v1 attempt1 output is 2 * 2 tasks = 4.
+    //v2 attempt0 = 4 + 1 = 5
+    testConf.set(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v2"), "0");
+    testConf.setInt(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v2", 0), 5);
+    //v3 attempt0 = 4 + 1 = 5
+    testConf.set(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "v3"), "0");
+    testConf.setInt(TestProcessor.getVertexConfName(
+            TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "v3", 0), 5);
         
     DAG dag = SimpleReverseVTestDAG.createDAG(
             "testInputFailureRerunCanSendOutputToTwoDownstreamVertices", testConf);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4f98537d/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
index 8ac1585..da4ea16 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.test;
 
+import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
@@ -26,8 +27,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.Reader;
@@ -70,6 +71,7 @@ public class TestInput implements LogicalInput {
   Set<Integer> failingTaskAttempts = Sets.newHashSet();
   Set<Integer> failingInputIndices = Sets.newHashSet();
   Integer failAll = new Integer(-1);
+  int[] inputValues;
   
   /**
    * Enable failure for this logical input
@@ -194,7 +196,17 @@ public class TestInput implements LogicalInput {
         
       }
     } while (!done);
-    return numInputs;
+    
+    // sum input value given by upstream tasks
+    int sum = 0;
+    for (int i=0; i<numInputs; ++i) {
+      if (inputValues[i] == -1) {
+        throwException("Invalid input value : " + i);
+      }
+      sum += inputValues[i];
+    }
+    // return sum value
+    return sum;
   }
   
   void throwException(String msg) {
@@ -213,7 +225,7 @@ public class TestInput implements LogicalInput {
     this.inputContext.requestInitialMemory(0l, null); //Mandatory call.
     if (inputContext.getUserPayload() != null) {
       String vName = inputContext.getTaskVertexName();
-      conf = MRHelpers.createConfFromUserPayload(inputContext.getUserPayload());
+      conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
       doFail = conf.getBoolean(getVertexConfName(TEZ_FAILING_INPUT_DO_FAIL, vName), false);
       doFailAndExit = conf.getBoolean(
           getVertexConfName(TEZ_FAILING_INPUT_DO_FAIL_AND_EXIT, vName), false);
@@ -266,6 +278,8 @@ public class TestInput implements LogicalInput {
             " numInputs: " + numInputs +
             " numCompletedInputs: " + numCompletedInputs);
         this.completedInputVersion[dmEvent.getTargetIndex()] = dmEvent.getVersion();
+        this.inputValues[dmEvent.getTargetIndex()] = 
+            ByteBuffer.wrap(dmEvent.getUserPayload()).getInt();
       } else if (event instanceof InputFailedEvent) {
         InputFailedEvent ifEvent = (InputFailedEvent) event;
         numCompletedInputs--;
@@ -304,8 +318,10 @@ public class TestInput implements LogicalInput {
   public void setNumPhysicalInputs(int numInputs) {
     this.numInputs = numInputs;
     this.completedInputVersion = new int[numInputs];
+    this.inputValues = new int[numInputs];
     for (int i=0; i<numInputs; ++i) {
       this.completedInputVersion[i] = -1;
+      this.inputValues[i] = -1;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4f98537d/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
index bef4218..4814d2d 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.test;
 
+import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 
@@ -40,6 +41,7 @@ public class TestOutput implements LogicalOutput {
         setUserPayload(payload);
   }
   
+  int output;
   int numOutputs;
   TezOutputContext outputContext;
   
@@ -50,6 +52,10 @@ public class TestOutput implements LogicalOutput {
     this.outputContext.requestInitialMemory(0l, null); //Mandatory call
     return Collections.emptyList();
   }
+  
+  void write(int value) {
+    this.output = value;
+  }
 
   @Override
   public void start() {
@@ -66,10 +72,11 @@ public class TestOutput implements LogicalOutput {
 
   @Override
   public List<Event> close() throws Exception {
-    LOG.info("Sending data movement event");
+    LOG.info("Sending data movement event with value: " + output);
+    byte[] result = ByteBuffer.allocate(4).putInt(output).array();
     List<Event> events = Lists.newArrayListWithCapacity(numOutputs);
     for (int i = 0; i < numOutputs; i++) {
-      DataMovementEvent event = new DataMovementEvent(i, null);
+      DataMovementEvent event = new DataMovementEvent(i, result);
       events.add(event);
     }
     return events;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4f98537d/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
index 2ebe3f1..086e458 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
@@ -25,8 +25,8 @@ import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalIOProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -57,6 +57,9 @@ public class TestProcessor implements LogicalIOProcessor {
   int failingTaskAttemptUpto = 0;
   Integer failAll = new Integer(-1);
   
+  int verifyValue = -1;
+  Set<Integer> verifyTaskIndices = Sets.newHashSet();
+  
   /**
    * Enable failure for this processor
    */
@@ -80,6 +83,12 @@ public class TestProcessor implements LogicalIOProcessor {
   public static String TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT =
       "tez.failing-processor.failing-upto-task-attempt";
   
+  public static String TEZ_FAILING_PROCESSOR_VERIFY_VALUE = 
+      "tez.failing-processor.verify-value";
+  
+  public static String TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX =
+      "tez.failing-processor.verify-task-index";
+  
   public static ProcessorDescriptor getProcDesc(byte[] payload) {
     return new ProcessorDescriptor(TestProcessor.class.getName()).
         setUserPayload(payload);
@@ -95,13 +104,30 @@ public class TestProcessor implements LogicalIOProcessor {
     return confName + "." + vertexName;
   }
   
+  public static String getVertexConfName(String confName, String vertexName,
+      int taskIndex) {
+    return confName + "." + vertexName + "." + String.valueOf(taskIndex);
+  }
+  
   @Override
   public void initialize(TezProcessorContext processorContext) throws Exception {
     this.processorContext = processorContext;
     if (processorContext.getUserPayload() != null) {
       String vName = processorContext.getTaskVertexName();
-      conf = MRHelpers.createConfFromUserPayload(processorContext
+      conf = TezUtils.createConfFromUserPayload(processorContext
           .getUserPayload());
+      verifyValue = conf.getInt(
+          getVertexConfName(TEZ_FAILING_PROCESSOR_VERIFY_VALUE, vName,
+              processorContext.getTaskIndex()), -1);
+      if (verifyValue != -1) {
+        LOG.info("Verify value: " + verifyValue);
+        for (String verifyIndex : conf
+            .getTrimmedStringCollection(
+                getVertexConfName(TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, vName))) {
+          LOG.info("Adding verify task index: " + verifyIndex);
+          verifyTaskIndices.add(Integer.valueOf(verifyIndex));
+        }
+      }
       doFail = conf.getBoolean(
           getVertexConfName(TEZ_FAILING_PROCESSOR_DO_FAIL, vName), false);
       sleepMs = conf.getLong(
@@ -159,10 +185,65 @@ public class TestProcessor implements LogicalIOProcessor {
       }
     }
     
+    if (inputs.entrySet().size() > 0) {
+        String msg = "Reading input of current FailingProcessor: " + processorContext.getUniqueIdentifier()
+ 
+            " dag: " + processorContext.getDAGName() +
+            " vertex: " + processorContext.getTaskVertexName() +
+            " taskIndex: " + processorContext.getTaskIndex() +
+            " taskAttempt: " + processorContext.getTaskAttemptNumber();
+        LOG.info(msg);
+    }
+    //initialize sum to attempt number + 1
+    int sum = processorContext.getTaskAttemptNumber() + 1;
+    LOG.info("initializing vertex= " + processorContext.getTaskVertexName() +
+             " taskIndex: " + processorContext.getTaskIndex() +
+             " taskAttempt: " + processorContext.getTaskAttemptNumber() +
+             " sum= " + sum);
+    //sum = summation of input values
     for (Map.Entry<String, LogicalInput> entry : inputs.entrySet()) {
-      LOG.info("Reading input: " + entry.getKey());
       TestInput input = (TestInput) entry.getValue();
-      input.doRead();
+      int inputValue = input.doRead();
+      LOG.info("Reading input: " + entry.getKey() + " inputValue= " + inputValue);
+      sum += inputValue;
+    }
+    
+    if (outputs.entrySet().size() > 0) {
+        String msg = "Writing output of current FailingProcessor: " + processorContext.getUniqueIdentifier()
+ 
+            " dag: " + processorContext.getDAGName() +
+            " vertex: " + processorContext.getTaskVertexName() +
+            " taskIndex: " + processorContext.getTaskIndex() +
+            " taskAttempt: " + processorContext.getTaskAttemptNumber();
+        LOG.info(msg);
+    }
+    for (Map.Entry<String, LogicalOutput> entry : outputs.entrySet()) {
+      LOG.info("Writing output: " + entry.getKey() + " sum= " + sum);
+      TestOutput output = (TestOutput) entry.getValue();
+      output.write(sum);
+    }
+    
+    LOG.info("Output for DAG: " + processorContext.getDAGName() 
+        + " vertex: " + processorContext.getTaskVertexName()
+        + " task: " + processorContext.getTaskIndex()
+        + " attempt: " + processorContext.getTaskAttemptNumber()
+        + " is: " + sum);
+    if (verifyTaskIndices
+        .contains(new Integer(processorContext.getTaskIndex()))) {
+      if (verifyValue != -1 && verifyValue != sum) {
+        // expected output value set and not equal to observed value
+        String msg = "Expected output mismatch of current FailingProcessor: " 
+                     + processorContext.getUniqueIdentifier() + 
+                     " dag: " + processorContext.getDAGName() +
+                     " vertex: " + processorContext.getTaskVertexName() +
+                     " taskIndex: " + processorContext.getTaskIndex() +
+                     " taskAttempt: " + processorContext.getTaskAttemptNumber();
+        msg += "\n" + "Expected output: " + verifyValue + " got: " + sum;
+        throwException(msg);
+      } else {
+        LOG.info("Verified output for DAG: " + processorContext.getDAGName()
+            + " vertex: " + processorContext.getTaskVertexName() + " task: "
+            + processorContext.getTaskIndex() + " attempt: "
+            + processorContext.getTaskAttemptNumber() + " is: " + sum);
+      }
     }
   }
 


Mime
View raw message