tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: TEZ-1057. Replace interfaces with abstract classes for Processor/Input/Output classes (bikas)
Date Mon, 04 Aug 2014 20:53:31 GMT
Repository: tez
Updated Branches:
  refs/heads/master 0df3d0f6f -> e92dc7fe4


TEZ-1057. Replace interfaces with abstract classes for Processor/Input/Output classes (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e92dc7fe
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e92dc7fe
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e92dc7fe

Branch: refs/heads/master
Commit: e92dc7fe4ed611ef88f77934940cd47a5da3ea11
Parents: 0df3d0f
Author: Bikas Saha <bikas@apache.org>
Authored: Mon Aug 4 13:53:19 2014 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Mon Aug 4 13:53:19 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../runtime/api/AbstractLogicalIOProcessor.java |  5 +-
 .../tez/runtime/api/LogicalIOProcessor.java     | 17 ------
 .../LogicalIOProcessorFrameworkInterface.java   | 43 +++++++++++++++
 .../org/apache/tez/runtime/api/Processor.java   | 26 ---------
 .../api/ProcessorFrameworkInterface.java        | 56 ++++++++++++++++++++
 .../java/org/apache/tez/runtime/api/Reader.java |  2 +-
 .../java/org/apache/tez/runtime/api/Writer.java |  2 +-
 .../processor/FilterByWordInputProcessor.java   |  3 --
 .../processor/FilterByWordOutputProcessor.java  | 13 ++---
 .../tez/mapreduce/input/MultiMRInput.java       |  1 -
 .../org/apache/tez/mapreduce/lib/MRReader.java  | 14 ++---
 .../tez/mapreduce/lib/MRReaderMapReduce.java    |  2 +-
 .../tez/mapreduce/lib/MRReaderMapred.java       |  2 +-
 .../mapreduce/processor/map/MapProcessor.java   | 14 +----
 .../processor/reduce/ReduceProcessor.java       | 13 +----
 .../runtime/LogicalIOProcessorRuntimeTask.java  | 16 +++---
 .../tez/runtime/library/api/KeyValueReader.java |  8 +--
 .../tez/runtime/library/api/KeyValueWriter.java |  4 +-
 .../runtime/library/api/KeyValuesReader.java    |  8 +--
 .../runtime/library/api/KeyValuesWriter.java    |  6 +--
 .../broadcast/output/FileBasedKVWriter.java     |  4 +-
 .../readers/ShuffledUnorderedKVReader.java      |  2 +-
 .../BaseUnorderedPartitionedKVWriter.java       |  3 +-
 .../input/ConcatenatedMergedKeyValueInput.java  |  2 +-
 .../input/ConcatenatedMergedKeyValuesInput.java |  2 +-
 .../library/input/ShuffledMergedInput.java      |  2 +-
 .../library/input/SortedGroupedMergedInput.java |  2 +-
 .../library/processor/SleepProcessor.java       |  1 +
 .../input/TestSortedGroupedMergedInput.java     |  3 +-
 .../java/org/apache/tez/test/TestProcessor.java |  2 -
 31 files changed, 148 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/e92dc7fe/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c862935..4a852e5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -41,6 +41,8 @@ INCOMPATIBLE CHANGES
   TEZ-1133. Remove some unused methods from MRHelpers.
   TEZ-1346. Change Processor to require context constructors for creation, and remove the
requirement of the initialize method requiring the context.
   TEZ-1041. Use VertexLocationHint consistently everywhere in the API
+  TEZ-1057. Replace interfaces with abstract classes for
+  Processor/Input/Output classes
 
 Release 0.4.0-incubating: 2014-04-05
 

http://git-wip-us.apache.org/repos/asf/tez/blob/e92dc7fe/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java
b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java
index 7688c14..173952e 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java
@@ -18,7 +18,6 @@
 package org.apache.tez.runtime.api;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
 /**
  * Abstract representation of the interface {@link LogicalIOProcessor}.
  * Implements the base logic of some methods into this class.
@@ -26,8 +25,8 @@ import org.apache.hadoop.classification.InterfaceStability;
  *
  */
 @InterfaceAudience.Public
-@InterfaceStability.Evolving
-public abstract class AbstractLogicalIOProcessor implements LogicalIOProcessor {
+public abstract class AbstractLogicalIOProcessor implements LogicalIOProcessor,
+    LogicalIOProcessorFrameworkInterface {
   private final TezProcessorContext context;
 
   /**

http://git-wip-us.apache.org/repos/asf/tez/blob/e92dc7fe/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalIOProcessor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalIOProcessor.java b/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalIOProcessor.java
index 859c2f7..cfee97b 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalIOProcessor.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalIOProcessor.java
@@ -18,26 +18,9 @@
 
 package org.apache.tez.runtime.api;
 
-import java.util.Map;
-
 /**
  * Represents a processor which consumes {@link LogicalInput}s and produces
  * {@link LogicalOutput}s
  */
 public interface LogicalIOProcessor extends Processor {
-
-  /**
-   * Runs the {@link Processor}
-   * 
-   * @param inputs
-   *          a map of the source vertex name to {@link LogicalInput} - one per
-   *          incoming edge.
-   * @param outputs
-   *          a map of the destination vertex name to {@link LogicalOutput} -
-   *          one per outgoing edge
-   * @throws Exception TODO
-   */
-  public void run(Map<String, LogicalInput> inputs,
-      Map<String, LogicalOutput> outputs) throws Exception;
-
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/e92dc7fe/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalIOProcessorFrameworkInterface.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalIOProcessorFrameworkInterface.java
b/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalIOProcessorFrameworkInterface.java
new file mode 100644
index 0000000..ff87cec
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalIOProcessorFrameworkInterface.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+import java.util.Map;
+
+/**
+ * Represents a processor framework interface which consumes {@link LogicalInput}s and produces
+ * {@link LogicalOutput}s
+ */
+public interface LogicalIOProcessorFrameworkInterface extends ProcessorFrameworkInterface
{
+
+  /**
+   * Runs the {@link Processor}
+   * 
+   * @param inputs
+   *          a map of the source vertex name to {@link LogicalInput} - one per
+   *          incoming edge.
+   * @param outputs
+   *          a map of the destination vertex name to {@link LogicalOutput} -
+   *          one per outgoing edge
+   * @throws Exception TODO
+   */
+  public void run(Map<String, LogicalInput> inputs,
+      Map<String, LogicalOutput> outputs) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e92dc7fe/tez-api/src/main/java/org/apache/tez/runtime/api/Processor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/Processor.java b/tez-api/src/main/java/org/apache/tez/runtime/api/Processor.java
index 358bd43..ca9100a 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/Processor.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/Processor.java
@@ -18,35 +18,9 @@
 
 package org.apache.tez.runtime.api;
 
-import java.util.List;
-
 /**
  * {@link Processor} represents the <em>Tez</em> entity responsible for
  * consuming {@link Input} and producing {@link Output}.
  */
 public interface Processor {
-
-  /**
-   * Initializes the <code>Processor</code>
-   *
-   * @throws java.lang.Exception
-   *           if an error occurs
-   */
-  public void initialize() throws Exception;
-
-  /**
-   * Handles user and system generated {@link Event}s.
-   *
-   * @param processorEvents
-   *          the list of {@link Event}s
-   */
-  public void handleEvents(List<Event> processorEvents);
-
-  /**
-   * Closes the <code>Processor</code>
-   *
-   * @throws java.io.IOException
-   *           if an error occurs
-   */
-  public void close() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/e92dc7fe/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorFrameworkInterface.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorFrameworkInterface.java
b/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorFrameworkInterface.java
new file mode 100644
index 0000000..7854280
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorFrameworkInterface.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+import java.util.List;
+
+/**
+ * Represents the Tez framework part of an {@link org.apache.tez.runtime.api.Processor}.
+ * <p/>
+ *
+ * This interface has methods which are used by the Tez framework to control the Processor.
+ * <p/>
+ */
+public interface ProcessorFrameworkInterface {
+
+  /**
+   * Initializes the <code>Processor</code>
+   *
+   * @param processorContext
+   * @throws java.io.IOException
+   *           if an error occurs
+   */
+  public void initialize() throws Exception;
+
+  /**
+   * Handles user and system generated {@link Event}s.
+   *
+   * @param processorEvents
+   *          the list of {@link Event}s
+   */
+  public void handleEvents(List<Event> processorEvents);
+
+  /**
+   * Closes the <code>Processor</code>
+   *
+   * @throws java.io.IOException
+   *           if an error occurs
+   */
+  public void close() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e92dc7fe/tez-api/src/main/java/org/apache/tez/runtime/api/Reader.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/Reader.java b/tez-api/src/main/java/org/apache/tez/runtime/api/Reader.java
index dd006bc..d5602c6 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/Reader.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/Reader.java
@@ -21,6 +21,6 @@ package org.apache.tez.runtime.api;
 /**
  * A <code>Reader</code> represents the data being read in an {@link Input}
  */
-public interface Reader {
+public abstract class Reader {
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/e92dc7fe/tez-api/src/main/java/org/apache/tez/runtime/api/Writer.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/Writer.java b/tez-api/src/main/java/org/apache/tez/runtime/api/Writer.java
index 9604e59..a9337d5 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/Writer.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/Writer.java
@@ -21,6 +21,6 @@ package org.apache.tez.runtime.api;
 /**
  * A <code>Writer</code> represents the data being written by an {@link Output}
  */
-public interface Writer {
+public abstract class Writer {
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/e92dc7fe/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
index 0314e83..e88ed2f 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
@@ -101,9 +101,6 @@ public class FilterByWordInputProcessor extends AbstractLogicalIOProcessor
{
       throw new IllegalStateException("FilterByWordInputProcessor processor can only work
with OnFileUnorderedKVOutput");
     }
 
-    
-    
-    
     MRInputLegacy mrInput = (MRInputLegacy) li;
     mrInput.init();
     OnFileUnorderedKVOutput kvOutput = (OnFileUnorderedKVOutput) lo;

http://git-wip-us.apache.org/repos/asf/tez/blob/e92dc7fe/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
index bf07735..e277839 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
@@ -19,13 +19,12 @@
 package org.apache.tez.processor;
 
 import java.util.List;
-import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
 import org.apache.tez.mapreduce.processor.map.MapProcessor;
-import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
@@ -35,7 +34,7 @@ import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
 
 
-public class FilterByWordOutputProcessor extends AbstractLogicalIOProcessor {
+public class FilterByWordOutputProcessor extends SimpleMRProcessor {
 
   private static final Log LOG = LogFactory.getLog(MapProcessor.class);
 
@@ -59,8 +58,7 @@ public class FilterByWordOutputProcessor extends AbstractLogicalIOProcessor
{
   }
 
   @Override
-  public void run(Map<String, LogicalInput> inputs,
-      Map<String, LogicalOutput> outputs) throws Exception {
+  public void run() throws Exception {
     
     if (inputs.size() != 1) {
       throw new IllegalStateException("FilterByWordOutputProcessor processor can only work
with a single input");
@@ -98,10 +96,5 @@ public class FilterByWordOutputProcessor extends AbstractLogicalIOProcessor
{
 
       kvWriter.write(key, value);
     }
-    if (getContext().canCommit()) {
-      mrOutput.commit();
-    } else {
-      mrOutput.abort();
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/e92dc7fe/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
index b4df3f0..c6ef81f 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java
@@ -26,7 +26,6 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;

http://git-wip-us.apache.org/repos/asf/tez/blob/e92dc7fe/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReader.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReader.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReader.java
index f8a5a5e..8a20827 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReader.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReader.java
@@ -24,11 +24,11 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 
 @Private
-public interface MRReader extends KeyValueReader {
-  void setSplit(Object split) throws IOException;
-  boolean isSetup();
-  float getProgress() throws IOException, InterruptedException;
-  void close() throws IOException;
-  Object getSplit();
-  Object getRecordReader();
+public abstract class MRReader extends KeyValueReader {
+  public abstract void setSplit(Object split) throws IOException;
+  public abstract boolean isSetup();
+  public abstract float getProgress() throws IOException, InterruptedException;
+  public abstract void close() throws IOException;
+  public abstract Object getSplit();
+  public abstract Object getRecordReader();
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/e92dc7fe/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
index 076f801..4689858 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.mapreduce.RecordReader;
 
 import com.google.common.base.Preconditions;
 
-public class MRReaderMapReduce implements MRReader {
+public class MRReaderMapReduce extends MRReader {
 
   private final TezCounter inputRecordCounter;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/e92dc7fe/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
index e2800ce..127e60f 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
@@ -36,7 +36,7 @@ import org.apache.tez.mapreduce.input.MRInput;
 
 import com.google.common.base.Preconditions;
 
-public class MRReaderMapred implements MRReader {
+public class MRReaderMapred extends MRReader {
 
   private static final Log LOG = LogFactory.getLog(MRReaderMapred.class);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/e92dc7fe/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
index ca6dbe2..0b6dfc0 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
@@ -44,7 +44,6 @@ import org.apache.tez.mapreduce.output.MROutputLegacy;
 import org.apache.tez.mapreduce.processor.MRTask;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalIOProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.TezProcessorContext;
@@ -53,7 +52,7 @@ import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.output.OnFileSortedOutput;
 
 @SuppressWarnings({ "unchecked", "rawtypes" })
-public class MapProcessor extends MRTask {
+public class MapProcessor extends MRTask{
 
   private static final Log LOG = LogFactory.getLog(MapProcessor.class);
 
@@ -62,17 +61,6 @@ public class MapProcessor extends MRTask {
   }
 
   @Override
-  public void initialize()
-      throws IOException {
-    try {
-      super.initialize();
-    } catch (InterruptedException e) {
-      throw new IOException(e);
-    }
-  }
-
-
-  @Override
   public void handleEvents(List<Event> processorEvents) {
     // TODO Auto-generated method stub
 

http://git-wip-us.apache.org/repos/asf/tez/blob/e92dc7fe/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index e4a2d93..41954f5 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -66,17 +66,6 @@ public class ReduceProcessor extends MRTask {
   }
 
   @Override
-  public void initialize()
-      throws IOException {
-    try {
-      super.initialize();
-    } catch (InterruptedException e) {
-      throw new IOException(e);
-    }
-  }
-
-
-  @Override
   public void handleEvents(List<Event> processorEvents) {
     // TODO Auto-generated method stub
 
@@ -353,4 +342,4 @@ public class ReduceProcessor extends MRTask {
     jobConf.setBoolean(JobContext.TASK_ISMAP, false);
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e92dc7fe/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index b0d6ffa..92a0f07 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -48,6 +48,7 @@ import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.InputFrameworkInterface;
@@ -105,7 +106,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   private ConcurrentHashMap<String, MergedLogicalInput> groupInputsMap;
 
   private final ProcessorDescriptor processorDescriptor;
-  private LogicalIOProcessor processor;
+  private AbstractLogicalIOProcessor processor;
   private TezProcessorContext processorContext;
 
   private final MemoryDistributor initialMemoryDistributor;
@@ -320,8 +321,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
           "Can only run while in INITED state. Current: " + this.state);
       this.state = State.RUNNING;
     }
-    LogicalIOProcessor lioProcessor = (LogicalIOProcessor) processor;
-    lioProcessor.run(runInputMap, runOutputMap);
+    processor.run(runInputMap, runOutputMap);
   }
 
   public void close() throws Exception {
@@ -553,16 +553,16 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     return (LogicalOutput) output;
   }
 
-  private LogicalIOProcessor createProcessor(
+  private AbstractLogicalIOProcessor createProcessor(
       String processorClassName, TezProcessorContext processorContext) {
     Processor processor = ReflectionUtils.createClazzInstance(processorClassName,
         new Class[]{TezProcessorContext.class}, new Object[]{processorContext});
-    if (!(processor instanceof LogicalIOProcessor)) {
+    if (!(processor instanceof AbstractLogicalIOProcessor)) {
       throw new TezUncheckedException(processor.getClass().getName()
-          + " is not a sub-type of LogicalIOProcessor."
-          + " Only LogicalIOProcessor sub-types supported by LogicalIOProcessorRuntimeTask.");
+          + " is not a sub-type of AbstractLogicalIOProcessor."
+          + " Only AbstractLogicalIOProcessor sub-types supported by LogicalIOProcessorRuntimeTask.");
     }
-    return (LogicalIOProcessor) processor;
+    return (AbstractLogicalIOProcessor) processor;
   }
 
   private void sendTaskGeneratedEvents(List<Event> events,

http://git-wip-us.apache.org/repos/asf/tez/blob/e92dc7fe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java
index 762d8df..e025362 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java
@@ -33,7 +33,7 @@ import org.apache.tez.runtime.api.Reader;
  * </code>
  *
  */
-public interface KeyValueReader extends Reader {
+public abstract class KeyValueReader extends Reader {
 
   /**
    * Moves to the next key/values(s) pair
@@ -42,14 +42,14 @@ public interface KeyValueReader extends Reader {
    * @throws IOException
    *           if an error occurs
    */
-  public boolean next() throws IOException;
+  public abstract boolean next() throws IOException;
 
   
   /**
    * Returns the current key
    * @return the current key
    */
-  public Object getCurrentKey() throws IOException;
+  public abstract Object getCurrentKey() throws IOException;
   
   
   /**
@@ -57,5 +57,5 @@ public interface KeyValueReader extends Reader {
    * @return the current value
    * @throws IOException
    */
-  public Object getCurrentValue() throws IOException;
+  public abstract Object getCurrentValue() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/e92dc7fe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriter.java
index 235f361..f67dbe0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriter.java
@@ -25,7 +25,7 @@ import org.apache.tez.runtime.api.Writer;
 /**
  * A key/value(s) pair based {@link Writer}
  */
-public interface KeyValueWriter extends Writer {
+public abstract class KeyValueWriter extends Writer {
   /**
    * Writes a key/value pair.
    * 
@@ -36,5 +36,5 @@ public interface KeyValueWriter extends Writer {
    * @throws IOException
    *           if an error occurs
    */
-  public void write(Object key, Object value) throws IOException;
+  public abstract void write(Object key, Object value) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/e92dc7fe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java
index 7861b75..171c5ec 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java
@@ -33,7 +33,7 @@ import org.apache.tez.runtime.api.Reader;
  * </code>
  *
  */
-public interface KeyValuesReader extends Reader {
+public abstract class KeyValuesReader extends Reader {
 
   /**
    * Moves to the next key/values(s) pair
@@ -42,18 +42,18 @@ public interface KeyValuesReader extends Reader {
    * @throws IOException
    *           if an error occurs
    */
-  public boolean next() throws IOException;
+  public abstract boolean next() throws IOException;
 
   
   /**
    * Returns the current key
    * @return the current key
    */
-  public Object getCurrentKey() throws IOException;
+  public abstract Object getCurrentKey() throws IOException;
   
   /**
    * Returns an Iterable view of the values associated with the current key
    * @return an Iterable view of the values associated with the current key
    */
-  public Iterable<Object> getCurrentValues() throws IOException;
+  public abstract Iterable<Object> getCurrentValues() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/e92dc7fe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesWriter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesWriter.java
index 83d5c25..aca7966 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesWriter.java
@@ -18,14 +18,12 @@
 
 package org.apache.tez.runtime.library.api;
 
-import org.apache.tez.runtime.api.Writer;
-
 import java.io.IOException;
 
 /**
  * key/value(s) based {@link KeyValueWriter}
  */
-public interface KeyValuesWriter extends KeyValueWriter {
+public abstract class KeyValuesWriter extends KeyValueWriter {
 
   /**
    * Writes a key and its associated values
@@ -36,5 +34,5 @@ public interface KeyValuesWriter extends KeyValueWriter {
    *          values to write
    * @throws java.io.IOException
    */
-  public void write(Object key, Iterable<Object> values) throws IOException;
+  public abstract void write(Object key, Iterable<Object> values) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/e92dc7fe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
index cc01477..5f48739 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
@@ -19,7 +19,6 @@
 package org.apache.tez.runtime.library.broadcast.output;
 
 import java.io.IOException;
-import java.util.Iterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,7 +34,6 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.runtime.api.TezOutputContext;
-import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.api.KeyValuesWriter;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
@@ -46,7 +44,7 @@ import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
 
 import com.google.common.base.Preconditions;
 
-public class FileBasedKVWriter implements KeyValuesWriter {
+public class FileBasedKVWriter extends KeyValuesWriter {
 
   private static final Log LOG = LogFactory.getLog(FileBasedKVWriter.class);
   

http://git-wip-us.apache.org/repos/asf/tez/blob/e92dc7fe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/ShuffledUnorderedKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/ShuffledUnorderedKVReader.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/ShuffledUnorderedKVReader.java
index 796890c..11cc71a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/ShuffledUnorderedKVReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/ShuffledUnorderedKVReader.java
@@ -37,7 +37,7 @@ import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
 import org.apache.tez.runtime.library.shuffle.common.impl.ShuffleManager;
 import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
 
-public class ShuffledUnorderedKVReader<K, V> implements KeyValueReader {
+public class ShuffledUnorderedKVReader<K, V> extends KeyValueReader {
 
   private static final Log LOG = LogFactory.getLog(ShuffledUnorderedKVReader.class);
   

http://git-wip-us.apache.org/repos/asf/tez/blob/e92dc7fe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
index 32b6dae..f784760 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
@@ -34,7 +34,6 @@ import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TezOutputContext;
-import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.api.KeyValuesWriter;
 import org.apache.tez.runtime.library.api.Partitioner;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
@@ -43,7 +42,7 @@ import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
 
 @SuppressWarnings("rawtypes")
-public abstract class BaseUnorderedPartitionedKVWriter implements KeyValuesWriter {
+public abstract class BaseUnorderedPartitionedKVWriter extends KeyValuesWriter {
 
   private static final Log LOG = LogFactory.getLog(BaseUnorderedPartitionedKVWriter.class);
   

http://git-wip-us.apache.org/repos/asf/tez/blob/e92dc7fe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
index 5c2e82f..0012bae 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
@@ -35,7 +35,7 @@ public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput
{
     super(context, inputs);
   }
 
-  public class ConcatenatedMergedKeyValueReader implements KeyValueReader {
+  public class ConcatenatedMergedKeyValueReader extends KeyValueReader {
     private int currentReaderIndex = 0;
     private KeyValueReader currentReader;
     

http://git-wip-us.apache.org/repos/asf/tez/blob/e92dc7fe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
index 3c56d07..9bcadd8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
@@ -35,7 +35,7 @@ public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput
{
     super(context, inputs);
   }
 
-  public class ConcatenatedMergedKeyValuesReader implements KeyValuesReader {
+  public class ConcatenatedMergedKeyValuesReader extends KeyValuesReader {
     private int currentReaderIndex = 0;
     private KeyValuesReader currentReader;
     

http://git-wip-us.apache.org/repos/asf/tez/blob/e92dc7fe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
index cad67cd..4c457bf 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
@@ -269,7 +269,7 @@ public class ShuffledMergedInput extends AbstractLogicalInput {
   }
 
   @SuppressWarnings("rawtypes")
-  private static class ShuffledMergedKeyValuesReader implements KeyValuesReader {
+  private static class ShuffledMergedKeyValuesReader extends KeyValuesReader {
 
     private final ValuesIterator valuesIter;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/e92dc7fe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/SortedGroupedMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/SortedGroupedMergedInput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/SortedGroupedMergedInput.java
index 197664e..f7d9c7f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/SortedGroupedMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/SortedGroupedMergedInput.java
@@ -68,7 +68,7 @@ public class SortedGroupedMergedInput extends MergedLogicalInput {
     }
   }
 
-  private static class SortedGroupedMergedKeyValuesReader implements KeyValuesReader {
+  private static class SortedGroupedMergedKeyValuesReader extends KeyValuesReader {
     private final PriorityQueue<KeyValuesReader> pQueue;
     @SuppressWarnings("rawtypes")
     private final RawComparator keyComparator;

http://git-wip-us.apache.org/repos/asf/tez/blob/e92dc7fe/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
index f0e69ad..7b3d9b1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
@@ -118,4 +118,5 @@ public class SleepProcessor extends AbstractLogicalIOProcessor {
       return timeToSleepMS;
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/e92dc7fe/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java
index 890b342..834ad29 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java
@@ -28,7 +28,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.hadoop.io.RawComparator;
 import org.apache.tez.runtime.InputReadyTracker;
@@ -392,7 +391,7 @@ public class TestSortedGroupedMergedInput {
     }
   }
 
-  private static class SortedTestKeyValuesReader implements KeyValuesReader {
+  private static class SortedTestKeyValuesReader extends KeyValuesReader {
 
     final int[] keys;
     final int[][] values;

http://git-wip-us.apache.org/repos/asf/tez/blob/e92dc7fe/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
index f0f7594..9231d37 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestProcessor.java
@@ -29,11 +29,9 @@ import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalIOProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.TezProcessorContext;
-import org.apache.tez.runtime.api.TezTaskContext;
 
 import com.google.common.collect.Sets;
 


Mime
View raw message