tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-1346. Change Processor to require context constructors for creation, and remove the requirement of the initialize method requiring the context. (sseth)
Date Thu, 31 Jul 2014 03:35:37 GMT
Repository: tez
Updated Branches:
  refs/heads/master 1c16b5bfc -> 70db632f5


TEZ-1346. Change Processor to require context constructors for creation,
and remove the requirement of the initialize method requiring the
context. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/70db632f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/70db632f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/70db632f

Branch: refs/heads/master
Commit: 70db632f5dc26ec66b69b669337a33ea555a0559
Parents: 1c16b5b
Author: Siddharth Seth <sseth@apache.org>
Authored: Wed Jul 30 20:34:56 2014 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Wed Jul 30 20:35:27 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../runtime/api/AbstractLogicalIOProcessor.java |  19 ++--
 .../tez/runtime/api/MergedLogicalInput.java     |   2 +-
 .../org/apache/tez/runtime/api/Processor.java   |   6 +-
 .../tez/runtime/task/TestTaskExecution.java     |   5 +
 .../examples/BroadcastAndOneToOneExample.java   |   9 ++
 .../mapreduce/examples/IntersectDataGen.java    |   5 +
 .../mapreduce/examples/IntersectExample.java    |   9 ++
 .../mapreduce/examples/IntersectValidate.java   |   7 +-
 .../tez/mapreduce/examples/UnionExample.java    |   9 ++
 .../tez/mapreduce/examples/WordCount.java       |   9 ++
 .../processor/FilterByWordInputProcessor.java   |  14 +--
 .../processor/FilterByWordOutputProcessor.java  |  11 +-
 .../apache/tez/mapreduce/processor/MRTask.java  |  25 +++--
 .../mapreduce/processor/SimpleMRProcessor.java  |   7 +-
 .../mapreduce/processor/map/MapProcessor.java   |  10 +-
 .../processor/reduce/ReduceProcessor.java       |  11 +-
 .../runtime/LogicalIOProcessorRuntimeTask.java  |  17 ++--
 .../TestLogicalIOProcessorRuntimeTask.java      |   4 +-
 .../library/common/sort/impl/TezMerger.java     |   2 +-
 .../library/processor/SimpleProcessor.java      |   5 +
 .../library/processor/SleepProcessor.java       |  14 ++-
 .../java/org/apache/tez/test/TestProcessor.java | 102 ++++++++++---------
 23 files changed, 197 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 404de28..e2e673f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -39,6 +39,7 @@ INCOMPATIBLE CHANGES
   TEZ-1303. Change Inputs, Outputs, InputInitializer, OutputCommitter, VertexManagerPlugin, EdgeManager
   to require constructors for creation, and remove the initialize methods.
   TEZ-1133. Remove some unused methods from MRHelpers.
+  TEZ-1346. Change Processor to require context constructors for creation, and remove the requirement of the initialize method requiring the context.
 
 Release 0.4.0-incubating: 2014-04-05
 

http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java
index 27e1bc8..7688c14 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java
@@ -28,17 +28,24 @@ import org.apache.hadoop.classification.InterfaceStability;
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public abstract class AbstractLogicalIOProcessor implements LogicalIOProcessor {
-  protected TezProcessorContext context;
+  private final TezProcessorContext context;
 
-  @Override
-  public void initialize(TezProcessorContext processorContext) throws Exception {
-    this.context = processorContext;
-    initialize();
+  /**
+   * Constructor an instance of the LogicalProcessor. Classes extending this one to create a
+   * LogicalProcessor, must provide the same constructor so that Tez can create an instance of the
+   * class at runtime.
+   *
+   * @param context the {@link org.apache.tez.runtime.api.TezProcessorContext} which provides
+   *                the Processor with context information within the running task.
+   */
+  public AbstractLogicalIOProcessor(TezProcessorContext context) {
+    this.context = context;
   }
 
+  @Override
   public abstract void initialize() throws Exception;
 
-  public TezProcessorContext getContext() {
+  public final TezProcessorContext getContext() {
     return context;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
index 6f7b14c..656bb2c 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
@@ -61,7 +61,7 @@ public abstract class MergedLogicalInput implements LogicalInput {
     return inputs;
   }
   
-  public TezMergedInputContext getContext() {
+  public final TezMergedInputContext getContext() {
     return context;
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-api/src/main/java/org/apache/tez/runtime/api/Processor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/Processor.java b/tez-api/src/main/java/org/apache/tez/runtime/api/Processor.java
index 49f35cd..358bd43 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/Processor.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/Processor.java
@@ -29,12 +29,10 @@ public interface Processor {
   /**
    * Initializes the <code>Processor</code>
    *
-   * @param processorContext
-   * @throws java.io.IOException
+   * @throws java.lang.Exception
    *           if an error occurs
    */
-  public void initialize(TezProcessorContext processorContext)
-      throws Exception;
+  public void initialize() throws Exception;
 
   /**
    * Handles user and system generated {@link Event}s.

http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java b/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
index 98abf1c..3fd6e2e 100644
--- a/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
+++ b/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
@@ -59,6 +59,7 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.TezProcessorContext;
 import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
 import org.apache.tez.runtime.api.impl.InputSpec;
@@ -355,6 +356,10 @@ public class TestTaskExecution {
     private boolean signalFatalAndLoop = false;
     private boolean signalFatalAndComplete = false;
 
+    public TestProcessor(TezProcessorContext context) {
+      super(context);
+    }
+
     @Override
     public void initialize() throws Exception {
       parseConf(getContext().getUserPayload());

http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
index 7e7d351..1a667a3 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
@@ -46,6 +46,7 @@ import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.runtime.api.TezProcessorContext;
 import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle;
 import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
 import org.apache.tez.runtime.library.api.KeyValueReader;
@@ -60,6 +61,10 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
   public static class InputProcessor extends SimpleProcessor {
     Text word = new Text();
 
+    public InputProcessor(TezProcessorContext context) {
+      super(context);
+    }
+
     @Override
     public void run() throws Exception {
       Preconditions.checkArgument(getOutputs().size() == 1);
@@ -82,6 +87,10 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
   public static class OneToOneProcessor extends SimpleProcessor {
     Text word = new Text();
 
+    public OneToOneProcessor(TezProcessorContext context) {
+      super(context);
+    }
+
     @Override
     public void run() throws Exception {
       Preconditions.checkArgument(inputs.size() == 2);

http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
index 9ed33f9..d5b0eb9 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
@@ -56,6 +56,7 @@ import org.apache.tez.mapreduce.committer.MROutputCommitter;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
+import org.apache.tez.runtime.api.TezProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 
 import com.google.common.base.Preconditions;
@@ -236,6 +237,10 @@ public class IntersectDataGen extends Configured implements Tool {
     long hashOutputFileSize;
     float overlapApprox = 0.2f;
 
+    public GenDataProcessor(TezProcessorContext context) {
+      super(context);
+    }
+
     public static byte[] createConfiguration(long streamOutputFileSize, long hashOutputFileSize)
         throws IOException {
       ByteArrayOutputStream bos = new ByteArrayOutputStream();

http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
index c5bf792..08a8029 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
@@ -62,6 +62,7 @@ import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.TezProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfigurer;
@@ -258,6 +259,10 @@ public class IntersectExample extends Configured implements Tool {
    * Reads key-values from the source and forwards the value as the key for the output
    */
   public static class ForwardingProcessor extends SimpleProcessor {
+    public ForwardingProcessor(TezProcessorContext context) {
+      super(context);
+    }
+
     @Override
     public void run() throws Exception {
       Preconditions.checkState(getInputs().size() == 1);
@@ -279,6 +284,10 @@ public class IntersectExample extends Configured implements Tool {
 
   public static class IntersectProcessor extends SimpleProcessor {
 
+    public IntersectProcessor(TezProcessorContext context) {
+      super(context);
+    }
+
     @Override
     public void run() throws Exception {
       Preconditions.checkState(getInputs().size() == 2);

http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
index 585ee63..bf5aa01 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
@@ -56,6 +56,7 @@ import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.TezProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
 import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
@@ -237,7 +238,11 @@ public class IntersectValidate extends Configured implements Tool {
   public static class IntersectValidateProcessor extends SimpleProcessor {
 
     private static final Log LOG = LogFactory.getLog(IntersectValidateProcessor.class);
-    
+
+    public IntersectValidateProcessor(TezProcessorContext context) {
+      super(context);
+    }
+
     @Override
     public void run() throws Exception {
       Preconditions.checkState(getInputs().size() == 2);

http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
index fdbe187..1fdd1f3 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
@@ -60,6 +60,7 @@ import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.Output;
+import org.apache.tez.runtime.api.TezProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
@@ -77,6 +78,10 @@ public class UnionExample {
     IntWritable one = new IntWritable(1);
     Text word = new Text();
 
+    public TokenProcessor(TezProcessorContext context) {
+      super(context);
+    }
+
     @Override
     public void run() throws Exception {
       Preconditions.checkArgument(getInputs().size() == 1);
@@ -121,6 +126,10 @@ public class UnionExample {
   public static class UnionProcessor extends SimpleMRProcessor {
     IntWritable one = new IntWritable(1);
 
+    public UnionProcessor(TezProcessorContext context) {
+      super(context);
+    }
+
     @Override
     public void run() throws Exception {
       Preconditions.checkArgument(getInputs().size() == 2);

http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
index 00fc326..0de2b04 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
@@ -57,6 +57,7 @@ import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
 import org.apache.tez.runtime.api.Output;
+import org.apache.tez.runtime.api.TezProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
@@ -72,6 +73,10 @@ public class WordCount extends Configured implements Tool {
     IntWritable one = new IntWritable(1);
     Text word = new Text();
 
+    public TokenProcessor(TezProcessorContext context) {
+      super(context);
+    }
+
     @Override
     public void run() throws Exception {
       Preconditions.checkArgument(getInputs().size() == 1);
@@ -92,6 +97,10 @@ public class WordCount extends Configured implements Tool {
   }
 
   public static class SumProcessor extends SimpleMRProcessor {
+    public SumProcessor(TezProcessorContext context) {
+      super(context);
+    }
+
     @Override
     public void run() throws Exception {
       Preconditions.checkArgument(getInputs().size() == 1);

http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
index 1d78366..0314e83 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
@@ -32,8 +32,8 @@ import org.apache.tez.mapreduce.examples.FilterLinesByWord.TextLongPair;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalIOProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.TezProcessorContext;
@@ -41,21 +41,23 @@ import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
 
-public class FilterByWordInputProcessor implements LogicalIOProcessor {
+public class FilterByWordInputProcessor extends AbstractLogicalIOProcessor {
 
   private static final Log LOG = LogFactory.getLog(FilterByWordInputProcessor.class);
 
   private String filterWord;
 
-  public FilterByWordInputProcessor() {
+  public FilterByWordInputProcessor(TezProcessorContext context) {
+    super(context);
   }
 
+
   @Override
-  public void initialize(TezProcessorContext processorContext) throws Exception {
-    Configuration conf = TezUtils.createConfFromUserPayload(processorContext.getUserPayload());
+  public void initialize() throws Exception {
+    Configuration conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
     filterWord = conf.get(FilterLinesByWord.FILTER_PARAM_NAME);
     if (filterWord == null) {
-      processorContext.fatalError(null, "No filter word specified");
+      getContext().fatalError(null, "No filter word specified");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
index d061ea0..bad2b9c 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
@@ -25,6 +25,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.processor.map.MapProcessor;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalIOProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -35,17 +37,18 @@ import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
 
 
-public class FilterByWordOutputProcessor implements LogicalIOProcessor {
+public class FilterByWordOutputProcessor extends AbstractLogicalIOProcessor {
 
   private static final Log LOG = LogFactory.getLog(MapProcessor.class);
   private TezProcessorContext processorContext;
 
-  public FilterByWordOutputProcessor() {
+  public FilterByWordOutputProcessor(TezProcessorContext context) {
+    super(context);
   }
 
+
   @Override
-  public void initialize(TezProcessorContext processorContext) throws Exception {
-    this.processorContext = processorContext;
+  public void initialize() throws Exception {
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
index ee002d8..d867107 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
@@ -76,6 +76,7 @@ import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl;
 import org.apache.tez.mapreduce.hadoop.mapreduce.JobContextImpl;
 import org.apache.tez.mapreduce.output.MROutputLegacy;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.TezProcessorContext;
@@ -83,7 +84,7 @@ import org.apache.tez.runtime.library.common.Constants;
 import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
 
 @SuppressWarnings("deprecation")
-public abstract class MRTask {
+public abstract class MRTask extends AbstractLogicalIOProcessor {
 
   static final Log LOG = LogFactory.getLog(MRTask.class);
 
@@ -118,27 +119,29 @@ public abstract class MRTask {
   protected MRTaskReporter mrReporter;
   protected boolean useNewApi;
 
-  public MRTask(boolean isMap) {
+  public MRTask(TezProcessorContext processorContext, boolean isMap) {
+    super(processorContext);
     this.isMap = isMap;
   }
 
   // TODO how to update progress
-  public void initialize(TezProcessorContext context) throws IOException,
+  @Override
+  public void initialize() throws IOException,
   InterruptedException {
 
     DeprecatedKeys.init();
 
-    processorContext = context;
-    counters = context.getCounters();
+    processorContext = getContext();
+    counters = processorContext.getCounters();
     this.taskAttemptId = new TaskAttemptID(
         new TaskID(
-            Long.toString(context.getApplicationId().getClusterTimestamp()),
-            context.getApplicationId().getId(),
+            Long.toString(processorContext.getApplicationId().getClusterTimestamp()),
+            processorContext.getApplicationId().getId(),
             (isMap ? TaskType.MAP : TaskType.REDUCE),
-            context.getTaskIndex()),
-          context.getTaskAttemptNumber());
+            processorContext.getTaskIndex()),
+        processorContext.getTaskAttemptNumber());
 
-    byte[] userPayload = context.getUserPayload();
+    byte[] userPayload = processorContext.getUserPayload();
     Configuration conf = TezUtils.createConfFromUserPayload(userPayload);
     if (conf instanceof JobConf) {
       this.jobConf = (JobConf)conf;
@@ -150,7 +153,7 @@ public abstract class MRTask {
     jobConf.set(MRJobConfig.TASK_ATTEMPT_ID,
       taskAttemptId.toString());
     jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
-        context.getDAGAttemptNumber());
+        processorContext.getDAGAttemptNumber());
 
     LOG.info("MRTask.inited: taskAttemptId = " + taskAttemptId.toString());
 

http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/SimpleMRProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/SimpleMRProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/SimpleMRProcessor.java
index 5fdcec0..23877de 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/SimpleMRProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/SimpleMRProcessor.java
@@ -24,13 +24,18 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezProcessorContext;
 import org.apache.tez.runtime.library.processor.SimpleProcessor;
 
 import com.google.common.collect.Lists;
 
 public abstract class SimpleMRProcessor extends SimpleProcessor {
   private static final Log LOG = LogFactory.getLog(SimpleMRProcessor.class);
-  
+
+  public SimpleMRProcessor(TezProcessorContext context) {
+    super(context);
+  }
+
   @Override
   protected void postOp() throws Exception {
     if (getOutputs() == null) {

http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
index a2dd7d1..ca6dbe2 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
@@ -53,19 +53,19 @@ import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.output.OnFileSortedOutput;
 
 @SuppressWarnings({ "unchecked", "rawtypes" })
-public class MapProcessor extends MRTask implements LogicalIOProcessor {
+public class MapProcessor extends MRTask {
 
   private static final Log LOG = LogFactory.getLog(MapProcessor.class);
 
-  public MapProcessor(){
-    super(true);
+  public MapProcessor(TezProcessorContext processorContext) {
+    super(processorContext, true);
   }
 
   @Override
-  public void initialize(TezProcessorContext processorContext)
+  public void initialize()
       throws IOException {
     try {
-      super.initialize(processorContext);
+      super.initialize();
     } catch (InterruptedException e) {
       throw new IOException(e);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index c5ade59..e4a2d93 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -42,7 +42,6 @@ import org.apache.tez.mapreduce.processor.MRTask;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.Input;
-import org.apache.tez.runtime.api.LogicalIOProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.TezProcessorContext;
@@ -55,22 +54,22 @@ import org.apache.tez.runtime.library.output.OnFileSortedOutput;
 
 
 @SuppressWarnings({ "unchecked", "rawtypes" })
-public class ReduceProcessor extends MRTask implements LogicalIOProcessor {
+public class ReduceProcessor extends MRTask {
 
   private static final Log LOG = LogFactory.getLog(ReduceProcessor.class);
 
   private Counter reduceInputKeyCounter;
   private Counter reduceInputValueCounter;
 
-  public ReduceProcessor() {
-    super(false);
+  public ReduceProcessor(TezProcessorContext processorContext) {
+    super(processorContext, false);
   }
 
   @Override
-  public void initialize(TezProcessorContext processorContext)
+  public void initialize()
       throws IOException {
     try {
-      super.initialize(processorContext);
+      super.initialize();
     } catch (InterruptedException e) {
       throw new IOException(e);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 5511b72..b0d6ffa 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -105,7 +105,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   private ConcurrentHashMap<String, MergedLogicalInput> groupInputsMap;
 
   private final ProcessorDescriptor processorDescriptor;
-  private final LogicalIOProcessor processor;
+  private LogicalIOProcessor processor;
   private TezProcessorContext processorContext;
 
   private final MemoryDistributor initialMemoryDistributor;
@@ -154,7 +154,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     this.runOutputMap = new LinkedHashMap<String, LogicalOutput>();
 
     this.processorDescriptor = taskSpec.getProcessorDescriptor();
-    this.processor = createProcessor(processorDescriptor);
     this.serviceConsumerMetadata = serviceConsumerMetadata;
     this.eventsToBeProcessed = new LinkedBlockingQueue<TezEvent>();
     this.state = State.NEW;
@@ -182,6 +181,10 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     Preconditions.checkState(this.state == State.NEW, "Already initialized");
     this.state = State.INITED;
 
+    LOG.info("Creating processor" + ", processorClassName=" + processorDescriptor.getClassName());
+    this.processorContext = createProcessorContext();
+    this.processor = createProcessor(processorDescriptor.getClassName(), processorContext);
+
     int numTasks = 0;
 
     int inputIndex = 0;
@@ -466,9 +469,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   private void initializeLogicalIOProcessor() throws Exception {
     LOG.info("Initializing processor" + ", processorClassName="
         + processorDescriptor.getClassName());
-    TezProcessorContext processorContext = createProcessorContext();
-    this.processorContext = processorContext;
-    processor.initialize(processorContext);
+    processor.initialize();
     LOG.info("Initialized processor" + ", processorClassName="
         + processorDescriptor.getClassName());
   }
@@ -553,9 +554,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   }
 
   private LogicalIOProcessor createProcessor(
-      ProcessorDescriptor processorDescriptor) {
-    Processor processor = ReflectionUtils.createClazzInstance(processorDescriptor
-        .getClassName());
+      String processorClassName, TezProcessorContext processorContext) {
+    Processor processor = ReflectionUtils.createClazzInstance(processorClassName,
+        new Class[]{TezProcessorContext.class}, new Object[]{processorContext});
     if (!(processor instanceof LogicalIOProcessor)) {
       throw new TezUncheckedException(processor.getClass().getName()
           + " is not a sub-type of LogicalIOProcessor."

http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
index dcf3303..7740cb0 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
@@ -43,6 +43,7 @@ import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.Reader;
 import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.TezProcessorContext;
 import org.apache.tez.runtime.api.Writer;
 import org.apache.tez.runtime.api.impl.InputSpec;
 import org.apache.tez.runtime.api.impl.OutputSpec;
@@ -141,7 +142,8 @@ public class TestLogicalIOProcessorRuntimeTask {
 
     public static volatile int runCount = 0;
 
-    public TestProcessor() {
+    public TestProcessor(TezProcessorContext context) {
+      super(context);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
index f5f1478..217e63a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
@@ -408,7 +408,7 @@ public class TezMerger {
     };
 
     public MergeQueue(Configuration conf, FileSystem fs, 
-                      Path[] inputs, boolean deleteInputs, 
+                      Path[] inputs, boolean deleteInputs,
                       CompressionCodec codec, boolean ifileReadAhead,
                       int ifileReadAheadLength, int ifileBufferSize,
                       boolean considerFinalMergeForProgress,

http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
index 60eae30..85ba412 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
@@ -24,11 +24,16 @@ import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezProcessorContext;
 
 public abstract class SimpleProcessor extends AbstractLogicalIOProcessor {
   protected Map<String, LogicalInput> inputs;
   protected Map<String, LogicalOutput> outputs;
 
+  public SimpleProcessor(TezProcessorContext context) {
+    super(context);
+  }
+
   public void run(Map<String, LogicalInput> _inputs, Map<String, LogicalOutput> _outputs)
       throws Exception {
     this.inputs = _inputs;

http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
index fada3fd..f0e69ad 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
@@ -20,8 +20,8 @@ package org.apache.tez.runtime.library.processor;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalIOProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.TezProcessorContext;
@@ -35,23 +35,27 @@ import java.util.Map;
  *
  * @see SleepProcessorConfig for configuring the SleepProcessor
  */
-public class SleepProcessor implements LogicalIOProcessor {
+public class SleepProcessor extends AbstractLogicalIOProcessor {
 
   private static final Log LOG = LogFactory.getLog(SleepProcessor.class);
 
   private int timeToSleepMS;
 
+  public SleepProcessor(TezProcessorContext context) {
+    super(context);
+  }
+
   @Override
-  public void initialize(TezProcessorContext processorContext)
+  public void initialize()
     throws Exception {
-    if (processorContext.getUserPayload() == null) {
+    if (getContext().getUserPayload() == null) {
       LOG.info("No processor user payload specified"
         + ", using default timeToSleep of 1 ms");
       timeToSleepMS = 1;
     } else {
       SleepProcessorConfig cfg =
         new SleepProcessorConfig();
-      cfg.fromUserPayload(processorContext.getUserPayload());
+      cfg.fromUserPayload(getContext().getUserPayload());
       timeToSleepMS = cfg.getTimeToSleepMS();
     }
     LOG.info("Initialized SleepProcessor, timeToSleepMS=" + timeToSleepMS);

http://git-wip-us.apache.org/repos/asf/tez/blob/70db632f/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
index db89e91..f0f7594 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
@@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalIOProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -44,12 +45,11 @@ import com.google.common.collect.Sets;
  * fail. It fails and exits if configured to do so. If not, then it calls
  * doRead() on all inputs to let them fail.
  */
-public class TestProcessor implements LogicalIOProcessor {
+public class TestProcessor extends AbstractLogicalIOProcessor {
   private static final Log LOG = LogFactory
       .getLog(TestProcessor.class);
   
   Configuration conf;
-  TezTaskContext processorContext;
   
   boolean doFail = false;
   long sleepMs;
@@ -88,7 +88,19 @@ public class TestProcessor implements LogicalIOProcessor {
   
   public static String TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX =
       "tez.failing-processor.verify-task-index";
-  
+
+  /**
+   * Constructor an instance of the LogicalProcessor. Classes extending this one to create a
+   * LogicalProcessor, must provide the same constructor so that Tez can create an instance of the
+   * class at runtime.
+   *
+   * @param context the {@link org.apache.tez.runtime.api.TezProcessorContext} which provides
+   *                the Processor with context information within the running task.
+   */
+  public TestProcessor(TezProcessorContext context) {
+    super(context);
+  }
+
   public static ProcessorDescriptor getProcDesc(byte[] payload) {
     return new ProcessorDescriptor(TestProcessor.class.getName()).
         setUserPayload(payload);
@@ -96,7 +108,7 @@ public class TestProcessor implements LogicalIOProcessor {
 
   void throwException(String msg) {
     RuntimeException e = new RuntimeException(msg);
-    processorContext.fatalError(e , msg);
+    getContext().fatalError(e , msg);
     throw e;
   }
 
@@ -110,15 +122,13 @@ public class TestProcessor implements LogicalIOProcessor {
   }
   
   @Override
-  public void initialize(TezProcessorContext processorContext) throws Exception {
-    this.processorContext = processorContext;
-    if (processorContext.getUserPayload() != null) {
-      String vName = processorContext.getTaskVertexName();
-      conf = TezUtils.createConfFromUserPayload(processorContext
-          .getUserPayload());
+  public void initialize() throws Exception {
+    if (getContext().getUserPayload() != null) {
+      String vName = getContext().getTaskVertexName();
+      conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
       verifyValue = conf.getInt(
           getVertexConfName(TEZ_FAILING_PROCESSOR_VERIFY_VALUE, vName,
-              processorContext.getTaskIndex()), -1);
+              getContext().getTaskIndex()), -1);
       if (verifyValue != -1) {
         LOG.info("Verify value: " + verifyValue);
         for (String verifyIndex : conf
@@ -143,7 +153,7 @@ public class TestProcessor implements LogicalIOProcessor {
         failingTaskAttemptUpto = conf.getInt(
             getVertexConfName(TEZ_FAILING_PROCESSOR_FAILING_UPTO_TASK_ATTEMPT, vName), 0);
         LOG.info("Adding failing attempt : " + failingTaskAttemptUpto + 
-            " dag: " + processorContext.getDAGName());
+            " dag: " + getContext().getDAGName());
       }
     }
   }
@@ -173,31 +183,31 @@ public class TestProcessor implements LogicalIOProcessor {
     if (doFail) {
       if (
           (failingTaskIndices.contains(failAll) ||
-          failingTaskIndices.contains(processorContext.getTaskIndex())) &&
+          failingTaskIndices.contains(getContext().getTaskIndex())) &&
           (failingTaskAttemptUpto == failAll.intValue() || 
-           failingTaskAttemptUpto >= processorContext.getTaskAttemptNumber())) {
-        String msg = "FailingProcessor: " + processorContext.getUniqueIdentifier() + 
-            " dag: " + processorContext.getDAGName() +
-            " taskIndex: " + processorContext.getTaskIndex() +
-            " taskAttempt: " + processorContext.getTaskAttemptNumber();
+           failingTaskAttemptUpto >= getContext().getTaskAttemptNumber())) {
+        String msg = "FailingProcessor: " + getContext().getUniqueIdentifier() + 
+            " dag: " + getContext().getDAGName() +
+            " taskIndex: " + getContext().getTaskIndex() +
+            " taskAttempt: " + getContext().getTaskAttemptNumber();
         LOG.info(msg);
         throwException(msg);
       }
     }
     
     if (inputs.entrySet().size() > 0) {
-        String msg = "Reading input of current FailingProcessor: " + processorContext.getUniqueIdentifier() + 
-            " dag: " + processorContext.getDAGName() +
-            " vertex: " + processorContext.getTaskVertexName() +
-            " taskIndex: " + processorContext.getTaskIndex() +
-            " taskAttempt: " + processorContext.getTaskAttemptNumber();
+        String msg = "Reading input of current FailingProcessor: " + getContext().getUniqueIdentifier() + 
+            " dag: " + getContext().getDAGName() +
+            " vertex: " + getContext().getTaskVertexName() +
+            " taskIndex: " + getContext().getTaskIndex() +
+            " taskAttempt: " + getContext().getTaskAttemptNumber();
         LOG.info(msg);
     }
     //initialize sum to attempt number + 1
-    int sum = processorContext.getTaskAttemptNumber() + 1;
-    LOG.info("initializing vertex= " + processorContext.getTaskVertexName() +
-             " taskIndex: " + processorContext.getTaskIndex() +
-             " taskAttempt: " + processorContext.getTaskAttemptNumber() +
+    int sum = getContext().getTaskAttemptNumber() + 1;
+    LOG.info("initializing vertex= " + getContext().getTaskVertexName() +
+             " taskIndex: " + getContext().getTaskIndex() +
+             " taskAttempt: " + getContext().getTaskAttemptNumber() +
              " sum= " + sum);
     //sum = summation of input values
     for (Map.Entry<String, LogicalInput> entry : inputs.entrySet()) {
@@ -213,11 +223,11 @@ public class TestProcessor implements LogicalIOProcessor {
     }
     
     if (outputs.entrySet().size() > 0) {
-        String msg = "Writing output of current FailingProcessor: " + processorContext.getUniqueIdentifier() + 
-            " dag: " + processorContext.getDAGName() +
-            " vertex: " + processorContext.getTaskVertexName() +
-            " taskIndex: " + processorContext.getTaskIndex() +
-            " taskAttempt: " + processorContext.getTaskAttemptNumber();
+        String msg = "Writing output of current FailingProcessor: " + getContext().getUniqueIdentifier() + 
+            " dag: " + getContext().getDAGName() +
+            " vertex: " + getContext().getTaskVertexName() +
+            " taskIndex: " + getContext().getTaskIndex() +
+            " taskAttempt: " + getContext().getTaskAttemptNumber();
         LOG.info(msg);
     }
     for (Map.Entry<String, LogicalOutput> entry : outputs.entrySet()) {
@@ -231,28 +241,28 @@ public class TestProcessor implements LogicalIOProcessor {
       output.write(sum);
     }
     
-    LOG.info("Output for DAG: " + processorContext.getDAGName() 
-        + " vertex: " + processorContext.getTaskVertexName()
-        + " task: " + processorContext.getTaskIndex()
-        + " attempt: " + processorContext.getTaskAttemptNumber()
+    LOG.info("Output for DAG: " + getContext().getDAGName() 
+        + " vertex: " + getContext().getTaskVertexName()
+        + " task: " + getContext().getTaskIndex()
+        + " attempt: " + getContext().getTaskAttemptNumber()
         + " is: " + sum);
     if (verifyTaskIndices
-        .contains(new Integer(processorContext.getTaskIndex()))) {
+        .contains(new Integer(getContext().getTaskIndex()))) {
       if (verifyValue != -1 && verifyValue != sum) {
         // expected output value set and not equal to observed value
         String msg = "Expected output mismatch of current FailingProcessor: " 
-                     + processorContext.getUniqueIdentifier() + 
-                     " dag: " + processorContext.getDAGName() +
-                     " vertex: " + processorContext.getTaskVertexName() +
-                     " taskIndex: " + processorContext.getTaskIndex() +
-                     " taskAttempt: " + processorContext.getTaskAttemptNumber();
+                     + getContext().getUniqueIdentifier() + 
+                     " dag: " + getContext().getDAGName() +
+                     " vertex: " + getContext().getTaskVertexName() +
+                     " taskIndex: " + getContext().getTaskIndex() +
+                     " taskAttempt: " + getContext().getTaskAttemptNumber();
         msg += "\n" + "Expected output: " + verifyValue + " got: " + sum;
         throwException(msg);
       } else {
-        LOG.info("Verified output for DAG: " + processorContext.getDAGName()
-            + " vertex: " + processorContext.getTaskVertexName() + " task: "
-            + processorContext.getTaskIndex() + " attempt: "
-            + processorContext.getTaskAttemptNumber() + " is: " + sum);
+        LOG.info("Verified output for DAG: " + getContext().getDAGName()
+            + " vertex: " + getContext().getTaskVertexName() + " task: "
+            + getContext().getTaskIndex() + " attempt: "
+            + getContext().getTaskAttemptNumber() + " is: " + sum);
       }
     }
   }


Mime
View raw message