tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-857. Split Input/Output interfaces into user/framework components. (sseth)
Date Mon, 21 Jul 2014 18:03:34 GMT
Repository: tez
Updated Branches:
  refs/heads/master 2cbd8a235 -> 57b5875ab


TEZ-857. Split Input/Output interfaces into user/framework components.
(sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/57b5875a
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/57b5875a
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/57b5875a

Branch: refs/heads/master
Commit: 57b5875ab7122269f26b665e022be3df59142068
Parents: 2cbd8a2
Author: Siddharth Seth <sseth@apache.org>
Authored: Mon Jul 21 11:03:18 2014 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Mon Jul 21 11:03:18 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../tez/runtime/api/AbstractLogicalInput.java   | 13 ++--
 .../tez/runtime/api/AbstractLogicalOutput.java  | 13 ++--
 .../java/org/apache/tez/runtime/api/Input.java  | 47 +-----------
 .../runtime/api/InputFrameworkInterface.java    | 79 ++++++++++++++++++++
 .../apache/tez/runtime/api/LogicalInput.java    | 12 +--
 .../api/LogicalInputFrameworkInterface.java     | 33 ++++++++
 .../apache/tez/runtime/api/LogicalOutput.java   | 10 +--
 .../api/LogicalOutputFrameworkInterface.java    | 33 ++++++++
 .../tez/runtime/api/MergedLogicalInput.java     |  2 +-
 .../java/org/apache/tez/runtime/api/Output.java | 45 ++---------
 .../runtime/api/OutputFrameworkInterface.java   | 73 ++++++++++++++++++
 .../org/apache/tez/runtime/api/Processor.java   |  4 +-
 .../runtime/LogicalIOProcessorRuntimeTask.java  | 76 ++++++++++---------
 .../OnFileUnorderedPartitionedKVOutput.java     |  8 +-
 .../apache/tez/test/dag/MultiAttemptDAG.java    | 24 ++----
 16 files changed, 304 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/57b5875a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ab09444..d03e1f5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -24,6 +24,7 @@ INCOMPATIBLE CHANGES
     refactors public methods used for this conversion.
   TEZ-696. Remove implicit copying of processor payload to input and output
   TEZ-1269. TaskScheduler prematurely releases containers
+  TEZ-857. Split Input/Output interfaces into user/framework components.
 
 Release 0.4.0-incubating: 2014-04-05
 

http://git-wip-us.apache.org/repos/asf/tez/blob/57b5875a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
index e079587..7afd61d 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
@@ -20,11 +20,15 @@ package org.apache.tez.runtime.api;
 import java.util.List;
 
 /**
- * The abstract implementation of {@link LogicalInput}. It includes default
- * implementations of few methods for the convenience.
+ * An abstract class which should be the base class for all implementations of LogicalInput.
+ *
+ * This class implements the framework facing as well as user facing methods which need to
be
+ * implemented by all LogicalInputs.
+ *
+ * This includes default implementations of a new method for convenience.
  * 
  */
-public abstract class AbstractLogicalInput implements LogicalInput {
+public abstract class AbstractLogicalInput implements LogicalInput, LogicalInputFrameworkInterface
{
 
   protected int numPhysicalInputs;
   protected TezInputContext inputContext;
@@ -49,5 +53,4 @@ public abstract class AbstractLogicalInput implements LogicalInput {
   public TezInputContext getContext() {
     return inputContext;
   }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/57b5875a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java
index abdf2ae..c9ae11d 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java
@@ -20,11 +20,15 @@ package org.apache.tez.runtime.api;
 import java.util.List;
 
 /**
- * The abstract implementation of {@link LogicalOutput}. It includes default
- * implementations of few methods for the convenience.
- * 
+ * An abstract class which should be the base class for all implementations of LogicalOutput.
+ *
+ * This class implements the framework facing as well as user facing methods which need to
be
+ * implemented by all LogicalOutputs.
+ *
+ * This includes default implementations of a new method for convenience.
+ *
  */
-public abstract class AbstractLogicalOutput implements LogicalOutput {
+public abstract class AbstractLogicalOutput implements LogicalOutput, LogicalOutputFrameworkInterface
{
 
   protected int numPhysicalOutputs;
   protected TezOutputContext outputContext;
@@ -49,5 +53,4 @@ public abstract class AbstractLogicalOutput implements LogicalOutput {
   public TezOutputContext getContext() {
     return outputContext;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/57b5875a/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java b/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
index f397114..7cc2197 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/Input.java
@@ -18,37 +18,16 @@
 
 package org.apache.tez.runtime.api;
 
-import java.util.List;
-
 /**
  * Represents an input through which a TezProcessor receives data on an edge.
  * </p>
- * 
- * <code>Input</code> classes must have a 0 argument public constructor for Tez
- * to construct the <code>Input</code>. Tez will take care of initializing and
- * closing the Input after a {@link Processor} completes. </p>
- * 
- * During initialization, Inputs must specify an initial memory requirement via
- * {@link TezInputContext}.requestInitialMemory
- * 
- * Inputs must also inform the framework once they are ready to be consumed.
- * This typically means that the Processor will not block when reading from the
- * corresponding Input. This is done via {@link TezInputContext}.inputIsReady.
- * Inputs choose the policy on when they are ready.
+ *
+ * This interface has methods which can be used by a {@link org.apache.tez.runtime.api.Processor}
+ * to control execution of this Input and read data from it.
+ *
  */
 public interface Input {
 
-  /**
-   * Initializes the <code>Input</code>.
-   *
-   * @param inputContext
-   *          the {@link TezInputContext}
-   * @return list of events that were generated during initialization
-   * @throws Exception
-   *           if an error occurs
-   */
-  public List<Event> initialize(TezInputContext inputContext)
-      throws Exception;
 
   /**
    * Start any processing that the Input may need to perform. It is the
@@ -76,22 +55,4 @@ public interface Input {
    *           if an error occurs
    */
   public Reader getReader() throws Exception;
-
-  /**
-   * Handles user and system generated {@link Event}s, which typically carry
-   * information such as an output being available on the previous vertex.
-   *
-   * @param inputEvents
-   *          the list of {@link Event}s
-   */
-  public void handleEvents(List<Event> inputEvents) throws Exception;
-
-  /**
-   * Closes the <code>Input</code>
-   *
-   * @return list of events that were generated during close
-   * @throws Exception
-   *           if an error occurs
-   */
-  public List<Event> close() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/57b5875a/tez-api/src/main/java/org/apache/tez/runtime/api/InputFrameworkInterface.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputFrameworkInterface.java
b/tez-api/src/main/java/org/apache/tez/runtime/api/InputFrameworkInterface.java
new file mode 100644
index 0000000..bf5d373
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputFrameworkInterface.java
@@ -0,0 +1,79 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.tez.runtime.api;
+
+import java.util.List;
+
+
+/**
+ * Represents an input through which a TezProcessor receives data on an edge.
+ * <p/>
+ *
+ * This interface has methods which are used by the Tez framework to control the Input.
+ * <p/>
+ *
+ * During initialization, Inputs must specify an initial memory requirement via
+ * {@link TezInputContext}.requestInitialMemory
+ * <p/>
+ *
+ * <code>Input</code> classes must have a 0 argument public constructor for Tez
+ * to construct the <code>Input</code>. Tez will take care of initializing and
+ * closing the Input after a {@link Processor} completes. </p>
+ * <p/>
+ *
+ * Inputs must also inform the framework once they are ready to be consumed.
+ * This typically means that the Processor will not block when reading from the
+ * corresponding Input. This is done via {@link TezInputContext}.inputIsReady.
+ * Inputs choose the policy on when they are ready.
+ */
+public interface InputFrameworkInterface {
+  /**
+   * Initializes the <code>Input</code>.
+   *
+   * @param inputContext
+   *          the {@link TezInputContext}
+   * @return list of events that were generated during initialization
+   * @throws Exception
+   *           if an error occurs
+   */
+  public List<Event> initialize(TezInputContext inputContext)
+      throws Exception;
+
+  /**
+   * Handles user and system generated {@link Event}s, which typically carry
+   * information such as an output being available on the previous vertex.
+   *
+   * @param inputEvents
+   *          the list of {@link Event}s
+   */
+  public void handleEvents(List<Event> inputEvents) throws Exception;
+
+
+
+  /**
+   * Closes the <code>Input</code>
+   *
+   * @return list of events that were generated during close
+   * @throws Exception
+   *           if an error occurs
+   */
+  public List<Event> close() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/57b5875a/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalInput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalInput.java
index 3382d12..74fc156 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalInput.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalInput.java
@@ -19,19 +19,9 @@
 package org.apache.tez.runtime.api;
 
 /**
- * An @link {@link Input} which handles all incoming physical connections on an
+ * An {@link Input} which handles all incoming physical connections on an
  * edge. A {@link LogicalIOProcessor} sees a single Logical Input per incoming
  * edge.
  */
 public interface LogicalInput extends Input {
-
-  /**
-   * Sets the number of physical inputs that this <code>LogicalInput</code> will
-   * receive. This will be called by the Tez framework before initializing the
-   * <code>LogicalInput</code>
-   * 
-   * @param numInputs
-   *          the number of physical inputs.
-   */
-  public void setNumPhysicalInputs(int numInputs);
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/57b5875a/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalInputFrameworkInterface.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalInputFrameworkInterface.java
b/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalInputFrameworkInterface.java
new file mode 100644
index 0000000..96aa256
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalInputFrameworkInterface.java
@@ -0,0 +1,33 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.tez.runtime.api;
+
+public interface LogicalInputFrameworkInterface extends InputFrameworkInterface {
+  /**
+   * Sets the number of physical inputs that this <code>LogicalInput</code> will
+   * receive. This will be called by the Tez framework before initializing the
+   * <code>LogicalInput</code>
+   *
+   * @param numInputs
+   *          the number of physical inputs.
+   */
+  public void setNumPhysicalInputs(int numInputs);
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/57b5875a/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalOutput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalOutput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalOutput.java
index a8d0ce3..90b64a2 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalOutput.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalOutput.java
@@ -24,13 +24,5 @@ package org.apache.tez.runtime.api;
  * edge.
  */
 public interface LogicalOutput extends Output {
-  /**
-   * Sets the number of physical ouputs that this <code>LogicalOutput</code>
-   * will receive. This will be called by the Tez framework before initializing
-   * the <code>LogicalOutput</code>
-   * 
-   * @param numOutputs
-   *          the number of physical outputs
-   */
-  public void setNumPhysicalOutputs(int numOutputs);
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/57b5875a/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalOutputFrameworkInterface.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalOutputFrameworkInterface.java
b/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalOutputFrameworkInterface.java
new file mode 100644
index 0000000..bc110c8
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/LogicalOutputFrameworkInterface.java
@@ -0,0 +1,33 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.tez.runtime.api;
+
+public interface LogicalOutputFrameworkInterface extends OutputFrameworkInterface {
+  /**
+   * Sets the number of physical outputs that this <code>LogicalOutput</code>
+   * will receive. This will be called by the Tez framework before initializing
+   * the <code>LogicalOutput</code>
+   *
+   * @param numOutputs
+   *          the number of physical outputs
+   */
+  public void setNumPhysicalOutputs(int numOutputs);
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/57b5875a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
index 2513589..ce2688c 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
@@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * This Input is not initialized or closed. It is only expected to provide a 
  * merged view of the real inputs. It cannot send or receive events
  */
-public abstract class MergedLogicalInput implements LogicalInput {
+public abstract class MergedLogicalInput implements LogicalInput, LogicalInputFrameworkInterface
{
 
   // TODO Remove with TEZ-866
   private volatile InputReadyCallback inputReadyCallback;

http://git-wip-us.apache.org/repos/asf/tez/blob/57b5875a/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java b/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java
index 76af6f6..77b1609 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/Output.java
@@ -18,33 +18,17 @@
 
 package org.apache.tez.runtime.api;
 
-import java.util.List;
-
 /**
- * Represents an Output through which a TezProcessor writes information on an
- * edge. </p>
+ * Represents an Output through which a TezProcessor writes information to an edge.
+ * </p>
  *
- * <code>Output</code> implementations must have a 0 argument public constructor
- * for Tez to construct the <code>Output</code>. Tez will take care of
- * initializing and closing the Input after a {@link Processor} completes. </p>
+ * This interface has methods which can be used by a {@link org.apache.tez.runtime.api.Processor}
+ * to control execution of this Output and write data to it.
  *
- * During initialization, Outputs must specify an initial memory requirement via
- * {@link TezOutputContext}.requestInitialMemory
- * 
  */
 public interface Output {
 
-  /**
-   * Initializes the <code>Output</code>
-   *
-   * @param outputContext
-   *          the {@link TezOutputContext}
-   * @return list of events that were generated during initialization
-   * @throws Exception
-   *           if an error occurs
-   */
-  public List<Event> initialize(TezOutputContext outputContext)
-      throws Exception;
+
 
   /**
    * Start any processing that the Output may need to perform. It is the
@@ -72,21 +56,4 @@ public interface Output {
    */
   public Writer getWriter() throws Exception;
 
-  /**
-   * Handles user and system generated {@link Event}s, which typically carry
-   * information such as a downstream vertex being ready to consume input.
-   *
-   * @param outputEvents
-   *          the list of {@link Event}s
-   */
-  public void handleEvents(List<Event> outputEvents);
-
-  /**
-   * Closes the <code>Output</code>
-   *
-   * @return list of events that were generated during close
-   * @throws Exception
-   *           if an error occurs
-   */
-  public List<Event> close() throws Exception;
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/57b5875a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputFrameworkInterface.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputFrameworkInterface.java
b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputFrameworkInterface.java
new file mode 100644
index 0000000..ed37f6d
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputFrameworkInterface.java
@@ -0,0 +1,73 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.tez.runtime.api;
+
+import java.util.List;
+
+/**
+ * Represents the Tez framework part of an {@link org.apache.tez.runtime.api.Output}.
+ * <p/>
+ *
+ * This interface has methods which are used by the Tez framework to control the Output.
+ * <p/>
+ *
+ * During initialization, Outputs must specify an initial memory requirement via
+ * {@link TezOutputContext}.requestInitialMemory
+ * <p/>
+ *
+ * <code>Output</code> classes must have a 0 argument public constructor for
Tez
+ * to construct the <code>Output</code>. Tez will take care of initializing and
+ * closing the Output after a {@link Processor} completes. </p>
+ * <p/>
+ *
+ */
+public interface OutputFrameworkInterface {
+
+  /**
+   * Initializes the <code>Output</code>
+   *
+   * @param outputContext
+   *          the {@link TezOutputContext}
+   * @return list of events that were generated during initialization
+   * @throws Exception
+   *           if an error occurs
+   */
+  public List<Event> initialize(TezOutputContext outputContext)
+      throws Exception;
+
+  /**
+   * Handles user and system generated {@link Event}s, which typically carry
+   * information such as a downstream vertex being ready to consume input.
+   *
+   * @param outputEvents
+   *          the list of {@link Event}s
+   */
+  public void handleEvents(List<Event> outputEvents);
+
+  /**
+   * Closes the <code>Output</code>
+   *
+   * @return list of events that were generated during close
+   * @throws Exception
+   *           if an error occurs
+   */
+  public List<Event> close() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/57b5875a/tez-api/src/main/java/org/apache/tez/runtime/api/Processor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/Processor.java b/tez-api/src/main/java/org/apache/tez/runtime/api/Processor.java
index fcb200c..49f35cd 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/Processor.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/Processor.java
@@ -30,7 +30,7 @@ public interface Processor {
    * Initializes the <code>Processor</code>
    *
    * @param processorContext
-   * @throws IOException
+   * @throws java.io.IOException
    *           if an error occurs
    */
   public void initialize(TezProcessorContext processorContext)
@@ -47,7 +47,7 @@ public interface Processor {
   /**
    * Closes the <code>Processor</code>
    *
-   * @throws IOException
+   * @throws java.io.IOException
    *           if an error occurs
    */
   public void close() throws Exception;

http://git-wip-us.apache.org/repos/asf/tez/blob/57b5875a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 122cc23..4784e42 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -49,11 +49,15 @@ import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.InputFrameworkInterface;
 import org.apache.tez.runtime.api.LogicalIOProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalInputFrameworkInterface;
 import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.LogicalOutputFrameworkInterface;
 import org.apache.tez.runtime.api.MergedLogicalInput;
 import org.apache.tez.runtime.api.Output;
+import org.apache.tez.runtime.api.OutputFrameworkInterface;
 import org.apache.tez.runtime.api.Processor;
 import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.api.TezOutputContext;
@@ -93,34 +97,36 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   private final List<OutputSpec> outputSpecs;
   private final ConcurrentHashMap<String, LogicalOutput> outputsMap;
   private final ConcurrentHashMap<String, TezOutputContext> outputContextMap;
-  
+
   private final List<GroupInputSpec> groupInputSpecs;
   private ConcurrentHashMap<String, MergedLogicalInput> groupInputsMap;
-  
+
   private final ProcessorDescriptor processorDescriptor;
   private final LogicalIOProcessor processor;
   private TezProcessorContext processorContext;
-  
+
   private final MemoryDistributor initialMemoryDistributor;
 
   /** Maps which will be provided to the processor run method */
   private final LinkedHashMap<String, LogicalInput> runInputMap;
   private final LinkedHashMap<String, LogicalOutput> runOutputMap;
-  
+
   private final Map<String, ByteBuffer> serviceConsumerMetadata;
-  
+
   private final ExecutorService initializerExecutor;
   private final CompletionService<Void> initializerCompletionService;
-  
+
   private final Multimap<String, String> startedInputsMap;
 
   private LinkedBlockingQueue<TezEvent> eventsToBeProcessed;
   private Thread eventRouterThread = null;
 
   private final int appAttemptNumber;
-  
+
   private final InputReadyTracker inputReadyTracker;
 
+  // KKK Make sure LogicalInputFramework checks are in place
+
   public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber,
       Configuration tezConf, String[] localDirs, TezUmbilical tezUmbilical,
       Map<String, ByteBuffer> serviceConsumerMetadata,
@@ -149,7 +155,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     this.state = State.NEW;
     this.appAttemptNumber = appAttemptNumber;
     int numInitializers = numInputs + numOutputs; // Processor is initialized in the main
thread.
-    numInitializers = (numInitializers == 0 ? 1 : numInitializers); 
+    numInitializers = (numInitializers == 0 ? 1 : numInitializers);
     this.initializerExecutor = Executors.newFixedThreadPool(
         numInitializers,
         new ThreadFactoryBuilder().setDaemon(true)
@@ -169,26 +175,26 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     LOG.info("Initializing LogicalProcessorIORuntimeTask");
     Preconditions.checkState(this.state == State.NEW, "Already initialized");
     this.state = State.INITED;
-    
+
     int numTasks = 0;
-    
+
     int inputIndex = 0;
     for (InputSpec inputSpec : taskSpec.getInputs()) {
       this.initializerCompletionService.submit(
           new InitializeInputCallable(inputSpec, inputIndex++));
       numTasks++;
     }
-    
+
     int outputIndex = 0;
     for (OutputSpec outputSpec : taskSpec.getOutputs()) {
       this.initializerCompletionService.submit(
           new InitializeOutputCallable(outputSpec, outputIndex++));
       numTasks++;
     }
-    
+
     // Initialize processor in the current thread.
     initializeLogicalIOProcessor();
-    
+
     int completedTasks = 0;
     while (completedTasks < numTasks) {
       LOG.info("Waiting for " + (numTasks-completedTasks) + " initializers to finish");
@@ -211,14 +217,14 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     this.inputReadyTracker
         .setGroupedInputs(groupInputsMap == null ? null : groupInputsMap.values());
     // Grouped input start will be controlled by the start of the GroupedInput
-    
+
     // Construct the set of groupedInputs up front so that start is not invoked on them.
     Set<String> groupInputs = Sets.newHashSet();
     // Construct Inputs/Outputs map argument for processor.run()
     // first add the group inputs
     if (groupInputSpecs !=null && !groupInputSpecs.isEmpty()) {
       for (GroupInputSpec groupInputSpec : groupInputSpecs) {
-        runInputMap.put(groupInputSpec.getGroupName(), 
+        runInputMap.put(groupInputSpec.getGroupName(),
                                  groupInputsMap.get(groupInputSpec.getGroupName()));
         groupInputs.addAll(groupInputSpec.getGroupVertices());
       }
@@ -244,7 +250,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       }
     }
 
-    if (groupInputSpecs != null) {      
+    if (groupInputSpecs != null) {
       for (GroupInputSpec group : groupInputSpecs) {
         if (!inputAlreadyStarted(taskSpec.getVertexName(), group.getGroupName())) {
           numAutoStarts++;
@@ -258,7 +264,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
 
     // Shutdown after all tasks complete.
     this.initializerExecutor.shutdown();
-    
+
     completedTasks = 0;
     LOG.info("Num IOs determined for AutoStart: " + numAutoStarts);
     while (completedTasks < numAutoStarts) {
@@ -277,7 +283,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     }
     LOG.info("AutoStartComplete");
 
-    
+
 
     // then add the non-grouped inputs
     for (InputSpec inputSpec : inputSpecs) {
@@ -292,10 +298,10 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       String outputName = outputSpec.getDestinationVertexName();
       runOutputMap.put(outputName, output);
     }
-    
+
     // TODO Maybe close initialized inputs / outputs in case of failure to
     // initialize.
-  
+
     startRouterThread();
   }
 
@@ -321,7 +327,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       // Close the Inputs.
       for (InputSpec inputSpec : inputSpecs) {
         String srcVertexName = inputSpec.getSourceVertexName();
-        List<Event> closeInputEvents = inputsMap.get(srcVertexName).close();
+        List<Event> closeInputEvents = ((InputFrameworkInterface)inputsMap.get(srcVertexName)).close();
         sendTaskGeneratedEvents(closeInputEvents,
             EventProducerConsumerType.INPUT, taskSpec.getVertexName(),
             srcVertexName, taskSpec.getTaskAttemptID());
@@ -330,7 +336,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       // Close the Outputs.
       for (OutputSpec outputSpec : outputSpecs) {
         String destVertexName = outputSpec.getDestinationVertexName();
-        List<Event> closeOutputEvents = outputsMap.get(destVertexName).close();
+        List<Event> closeOutputEvents = ((LogicalOutputFrameworkInterface)outputsMap.get(destVertexName)).close();
         sendTaskGeneratedEvents(closeOutputEvents,
             EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(),
             destVertexName, taskSpec.getTaskAttemptID());
@@ -362,12 +368,12 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       inputsMap.put(edgeName, input);
       inputContextMap.put(edgeName, inputContext);
 
-      if (input instanceof LogicalInput) {
-        ((LogicalInput) input).setNumPhysicalInputs(inputSpec
+      if (input instanceof LogicalInputFrameworkInterface) {
+        ((LogicalInputFrameworkInterface) input).setNumPhysicalInputs(inputSpec
             .getPhysicalEdgeCount());
       }
       LOG.info("Initializing Input with src edge: " + edgeName);
-      List<Event> events = input.initialize(inputContext);
+      List<Event> events = ((InputFrameworkInterface)input).initialize(inputContext);
       sendTaskGeneratedEvents(events, EventProducerConsumerType.INPUT,
           inputContext.getTaskVertexName(), inputContext.getSourceVertexName(),
           taskSpec.getTaskAttemptID());
@@ -379,12 +385,12 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   private class StartInputCallable implements Callable<Void> {
     private final LogicalInput input;
     private final String srcVertexName;
-    
+
     public StartInputCallable(LogicalInput input, String srcVertexName) {
       this.input = input;
       this.srcVertexName = srcVertexName;
     }
-    
+
     @Override
     public Void call() throws Exception {
       LOG.info("Starting Input with src edge: " + srcVertexName);
@@ -413,12 +419,12 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       outputsMap.put(edgeName, output);
       outputContextMap.put(edgeName, outputContext);
 
-      if (output instanceof LogicalOutput) {
-        ((LogicalOutput) output).setNumPhysicalOutputs(outputSpec
+      if (output instanceof LogicalOutputFrameworkInterface) {
+        ((LogicalOutputFrameworkInterface) output).setNumPhysicalOutputs(outputSpec
             .getPhysicalEdgeCount());
       }
       LOG.info("Initializing Output with dest edge: " + edgeName);
-      List<Event> events = output.initialize(outputContext);
+      List<Event> events = ((OutputFrameworkInterface)output).initialize(outputContext);
       sendTaskGeneratedEvents(events, EventProducerConsumerType.OUTPUT,
           outputContext.getTaskVertexName(),
           outputContext.getDestinationVertexName(), taskSpec.getTaskAttemptID());
@@ -436,7 +442,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   }
 
   private void initializeGroupInputs() {
-    if (groupInputSpecs != null && !groupInputSpecs.isEmpty()) {      
+    if (groupInputSpecs != null && !groupInputSpecs.isEmpty()) {
      groupInputsMap = new ConcurrentHashMap<String, MergedLogicalInput>(groupInputSpecs.size());
      for (GroupInputSpec groupInputSpec : groupInputSpecs) {
         LOG.info("Initializing GroupInput using GroupInputSpec: " + groupInputSpec);
@@ -451,7 +457,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
       }
     }
   }
-  
+
   private void initializeLogicalIOProcessor() throws Exception {
     LOG.info("Initializing processor" + ", processorClassName="
         + processorDescriptor.getClassName());
@@ -511,7 +517,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     }
     return (LogicalInput)input;
   }
-  
+
   private LogicalOutput createOutput(OutputSpec outputSpec) {
     LOG.info("Creating Output");
     Output output = ReflectionUtils.createClazzInstance(outputSpec
@@ -573,7 +579,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
         LogicalInput input = inputsMap.get(
             e.getDestinationInfo().getEdgeVertexName());
         if (input != null) {
-          input.handleEvents(Collections.singletonList(e.getEvent()));
+          ((InputFrameworkInterface)input).handleEvents(Collections.singletonList(e.getEvent()));
         } else {
           throw new TezUncheckedException("Unhandled event for invalid target: "
               + e);
@@ -583,7 +589,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
         LogicalOutput output = outputsMap.get(
             e.getDestinationInfo().getEdgeVertexName());
         if (output != null) {
-          output.handleEvents(Collections.singletonList(e.getEvent()));
+          ((OutputFrameworkInterface)output).handleEvents(Collections.singletonList(e.getEvent()));
         } else {
           throw new TezUncheckedException("Unhandled event for invalid target: "
               + e);

http://git-wip-us.apache.org/repos/asf/tez/blob/57b5875a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedPartitionedKVOutput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedPartitionedKVOutput.java
index ff4901d..5f7ac48 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedPartitionedKVOutput.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.runtime.api.AbstractLogicalOutput;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.TezOutputContext;
@@ -44,7 +45,7 @@ import org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWrite
  * write Key-Value pairs. The key-value pairs are written to the correct partition based
on the
  * configured Partitioner.
  */
-public class OnFileUnorderedPartitionedKVOutput implements LogicalOutput {
+public class OnFileUnorderedPartitionedKVOutput extends AbstractLogicalOutput {
 
   private static final Log LOG = LogFactory.getLog(OnFileUnorderedPartitionedKVOutput.class);
 
@@ -70,6 +71,11 @@ public class OnFileUnorderedPartitionedKVOutput implements LogicalOutput
{
   }
 
   @Override
+  public List<Event> initialize() throws Exception {
+    return null;
+  }
+
+  @Override
   public synchronized void start() throws Exception {
     if (!isStarted.get()) {
       memoryUpdateCallbackHandler.validateUpdateReceived();

http://git-wip-us.apache.org/repos/asf/tez/blob/57b5875a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
index bb81f82..51d3d9e 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
@@ -38,15 +38,13 @@ import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
 import org.apache.tez.dag.api.client.VertexStatus.State;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
+import org.apache.tez.runtime.api.AbstractLogicalOutput;
 import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.LogicalInput;
-import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.MemoryUpdateCallback;
 import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.OutputCommitterContext;
 import org.apache.tez.runtime.api.Reader;
-import org.apache.tez.runtime.api.TezInputContext;
-import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.api.TezRootInputInitializer;
 import org.apache.tez.runtime.api.TezRootInputInitializerContext;
 import org.apache.tez.runtime.api.Writer;
@@ -227,15 +225,10 @@ public class MultiAttemptDAG {
     }
   }
 
-  public static class NoOpInput implements LogicalInput, MemoryUpdateCallback {
+  public static class NoOpInput extends AbstractLogicalInput implements MemoryUpdateCallback
{
 
     @Override
-    public void setNumPhysicalInputs(int numInputs) {
-
-    }
-
-    @Override
-    public List<Event> initialize(TezInputContext inputContext) throws Exception {
+    public List<Event> initialize() throws Exception {
       inputContext.requestInitialMemory(1l, this);
       return null;
     }
@@ -266,15 +259,10 @@ public class MultiAttemptDAG {
     }
   }
 
-  public static class NoOpOutput implements LogicalOutput, MemoryUpdateCallback {
-
-    @Override
-    public void setNumPhysicalOutputs(int numOutputs) {
-
-    }
+  public static class NoOpOutput extends AbstractLogicalOutput implements MemoryUpdateCallback
{
 
     @Override
-    public List<Event> initialize(TezOutputContext outputContext) throws Exception
{
+    public List<Event> initialize() throws Exception {
       outputContext.requestInitialMemory(1l, this);
       return null;
     }


Mime
View raw message