+ * {@link InputDataInformationEvent}s for the actual Input.
* If this is not specified, the parallelism must be set for the
* vertex. In addition, the Input should know how to access data for
- * each of it's tasks. If a {@link TezRootInputInitializer} is
+ * each of it's tasks. If a {@link InputInitializer} is
* meant to determine the parallelism of the vertex, the initial
* vertex parallelism should be set to -1. Can be null.
*/
@@ -67,10 +67,10 @@ public class DataSourceDescriptor {
* @param inputInitializerDescriptor
* An initializer for this Input which may run within the AM. This
* can be used to set the parallelism for this vertex and generate
- * {@link RootInputDataInformationEvent}s for the actual Input.
+ * {@link InputDataInformationEvent}s for the actual Input.
* If this is not specified, the parallelism must be set for the
* vertex. In addition, the Input should know how to access data for
- * each of it's tasks. If a {@link TezRootInputInitializer} is
+ * each of it's tasks. If a {@link InputInitializer} is
* meant to determine the parallelism of the vertex, the initial
* vertex parallelism should be set to -1. Can be null.
* @param numShards Number of shards of data
http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java
deleted file mode 100644
index c447ca5..0000000
--- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.api;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.api.events.InputReadErrorEvent;
-
-/**
- * This interface defines the routing of the event between tasks of producer and
- * consumer vertices. The routing is bi-directional. Users can customize the
- * routing by providing an implementation of this interface.
- *
- * Implementations must provide a 0 argument public constructor.
- */
-@InterfaceStability.Unstable
-public abstract class EdgeManager {
-
- private final EdgeManagerContext context;
-
- /**
- * Crete an instance of the VertexManagerPlugin. Classes extending this one to create a
- * VertexManagerPlugin, must provide the same constructor so that Tez can create an instance of
- * the class at runtime.
- *
- * @param context the context within which this EdgeManager will run. Includes
- * information like configuration which the user may have specified
- * while setting up the edge.
- */
- public EdgeManager(EdgeManagerContext context) {
- this.context = context;
- }
-
- /**
- * Initializes the EdgeManager. This method is called in the following
- * circumstances 1. when initializing an Edge Manager for the first time.
- * 2. When an EdgeManager is replaced at runtime. At this point, an
- * EdgeManager instance is created and setup by the user. The initialize
- * method will be called with the original {@link EdgeManagerContext} when the
- * edgeManager is replaced.
- *
- */
- public abstract void initialize();
-
- /**
- * Get the number of physical inputs on the destination task
- * @param destinationTaskIndex Index of destination task for which number of
- * inputs is needed
- * @return Number of physical inputs on the destination task
- */
- public abstract int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex);
-
- /**
- * Get the number of physical outputs on the source task
- * @param sourceTaskIndex Index of the source task for which number of outputs
- * is needed
- * @return Number of physical outputs on the source task
- */
- public abstract int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex);
-
- /**
- * Return the routing information to inform consumers about the source task
- * output that is now available. The return map has the routing information.
- * The event will be routed to every destination task index in the key of the
- * map. Every physical input in the value for that task key will receive the
- * input.
- *
- * @param event
- * Data movement event that contains the output information
- * @param sourceTaskIndex
- * Source task that produced the event
- * @param sourceOutputIndex
- * Index of the physical output on the source task that produced the
- * event
- * @param destinationTaskAndInputIndices
- * Map via which the routing information is returned
- */
- public abstract void routeDataMovementEventToDestination(DataMovementEvent event,
- int sourceTaskIndex, int sourceOutputIndex,
- Map> destinationTaskAndInputIndices);
-
- /**
- * Return the routing information to inform consumers about the failure of a
- * source task whose outputs have been potentially lost. The return map has
- * the routing information. The failure notification event will be sent to
- * every task index in the key of the map. Every physical input in the value
- * for that task key will receive the failure notification. This method will
- * be called once for every source task failure and information for all
- * affected destinations must be provided in that invocation.
- *
- * @param sourceTaskIndex
- * Source task
- * @param destinationTaskAndInputIndices
- * Map via which the routing information is returned
- */
- public abstract void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
- Map> destinationTaskAndInputIndices);
-
- /**
- * Get the number of destination tasks that consume data from the source task
- * @param sourceTaskIndex Source task index
- */
- public abstract int getNumDestinationConsumerTasks(int sourceTaskIndex);
-
- /**
- * Return the source task index to which to send the input error event
- *
- * @param event
- * Input read error event. Has more information about the error
- * @param destinationTaskIndex
- * Destination task that reported the error
- * @param destinationFailedInputIndex
- * Index of the physical input on the destination task that reported
- * the error
- * @return Index of the source task that created the unavailable input
- */
- public abstract int routeInputErrorEventToSource(InputReadErrorEvent event,
- int destinationTaskIndex, int destinationFailedInputIndex);
-
- /**
- * Return ahe {@link org.apache.tez.dag.api.EdgeManagerContext} for this specific instance of
- * the vertex manager.
- *
- * @return the {@link org.apache.tez.dag.api.EdgeManagerContext} for the input
- */
- public EdgeManagerContext getContext() {
- return this.context;
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerContext.java
deleted file mode 100644
index 4158647..0000000
--- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerContext.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.api;
-
-/**
- * Context information provided to Edge plugins on initialization.
- *
- */
-public interface EdgeManagerContext {
-
- /**
- * Returns the byte payload specified by the user for the edge.
- * @return the byte payload specified by the user
- */
- public byte[] getUserPayload();
-
- /**
- * Returns the source vertex name
- * @return the source vertex name
- */
- public String getSourceVertexName();
-
- /**
- * Returns the destination vertex name
- * @return the destination vertex name
- */
- public String getDestinationVertexName();
-
- /**
- * Returns the number of tasks in the source vertex
- */
- public int getSourceVertexNumTasks();
-
- /**
- * Returns the number of tasks in the destination vertex
- */
- public int getDestinationVertexNumTasks();
-
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerDescriptor.java
deleted file mode 100644
index 058e8e8..0000000
--- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerDescriptor.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.api;
-
-public class EdgeManagerDescriptor extends TezEntityDescriptor {
-
- public EdgeManagerDescriptor(String edgeManagerClassName) {
- super(edgeManagerClassName);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPlugin.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPlugin.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPlugin.java
new file mode 100644
index 0000000..07bf3fe
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPlugin.java
@@ -0,0 +1,147 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.api;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+
+/**
+ * This interface defines the routing of the event between tasks of producer and
+ * consumer vertices. The routing is bi-directional. Users can customize the
+ * routing by providing an implementation of this interface.
+ */
+@InterfaceStability.Unstable
+public abstract class EdgeManagerPlugin {
+
+ private final EdgeManagerPluginContext context;
+
+ /**
+ * Create an instance of the EdgeManagerPlugin. Classes extending this to
+ * create a EdgeManagerPlugin, must provide the same constructor so that Tez
+ * can create an instance of the class at runtime.
+ *
+ * @param context
+ * the context within which this EdgeManagerPlugin will run. Includes
+ * information like configuration which the user may have specified
+ * while setting up the edge.
+ */
+ public EdgeManagerPlugin(EdgeManagerPluginContext context) {
+ this.context = context;
+ }
+
+ /**
+ * Initializes the EdgeManagerPlugin. This method is called in the following
+ * circumstances 1. when initializing an EdgeManagerPlugin for the first time.
+ * 2. When an EdgeManagerPlugin is replaced at runtime. At this point, an
+ * EdgeManagerPlugin instance is created and setup by the user. The initialize
+ * method will be called with the original {@link EdgeManagerPluginContext} when the
+ * EdgeManagerPlugin is replaced.
+ *
+ */
+ public abstract void initialize();
+
+ /**
+ * Get the number of physical inputs on the destination task
+ * @param destinationTaskIndex Index of destination task for which number of
+ * inputs is needed
+ * @return Number of physical inputs on the destination task
+ */
+ public abstract int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex);
+
+ /**
+ * Get the number of physical outputs on the source task
+ * @param sourceTaskIndex Index of the source task for which number of outputs
+ * is needed
+ * @return Number of physical outputs on the source task
+ */
+ public abstract int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex);
+
+ /**
+ * Return the routing information to inform consumers about the source task
+ * output that is now available. The return map has the routing information.
+ * The event will be routed to every destination task index in the key of the
+ * map. Every physical input in the value for that task key will receive the
+ * input.
+ *
+ * @param event
+ * Data movement event that contains the output information
+ * @param sourceTaskIndex
+ * Source task that produced the event
+ * @param sourceOutputIndex
+ * Index of the physical output on the source task that produced the
+ * event
+ * @param destinationTaskAndInputIndices
+ * Map via which the routing information is returned
+ */
+ public abstract void routeDataMovementEventToDestination(DataMovementEvent event,
+ int sourceTaskIndex, int sourceOutputIndex,
+ Map> destinationTaskAndInputIndices);
+
+ /**
+ * Return the routing information to inform consumers about the failure of a
+ * source task whose outputs have been potentially lost. The return map has
+ * the routing information. The failure notification event will be sent to
+ * every task index in the key of the map. Every physical input in the value
+ * for that task key will receive the failure notification. This method will
+ * be called once for every source task failure and information for all
+ * affected destinations must be provided in that invocation.
+ *
+ * @param sourceTaskIndex
+ * Source task
+ * @param destinationTaskAndInputIndices
+ * Map via which the routing information is returned
+ */
+ public abstract void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
+ Map> destinationTaskAndInputIndices);
+
+ /**
+ * Get the number of destination tasks that consume data from the source task
+ * @param sourceTaskIndex Source task index
+ */
+ public abstract int getNumDestinationConsumerTasks(int sourceTaskIndex);
+
+ /**
+ * Return the source task index to which to send the input error event
+ *
+ * @param event
+ * Input read error event. Has more information about the error
+ * @param destinationTaskIndex
+ * Destination task that reported the error
+ * @param destinationFailedInputIndex
+ * Index of the physical input on the destination task that reported
+ * the error
+ * @return Index of the source task that created the unavailable input
+ */
+ public abstract int routeInputErrorEventToSource(InputReadErrorEvent event,
+ int destinationTaskIndex, int destinationFailedInputIndex);
+
+ /**
+ * Return ahe {@link org.apache.tez.dag.api.EdgeManagerPluginContext} for this specific instance of
+ * the vertex manager.
+ *
+ * @return the {@link org.apache.tez.dag.api.EdgeManagerPluginContext} for the input
+ */
+ public EdgeManagerPluginContext getContext() {
+ return this.context;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginContext.java
new file mode 100644
index 0000000..d55ea6d
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginContext.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+/**
+ * Context information provided to {@link EdgeManagerPlugin}s
+ * This interface is not supposed to be implemented by users
+ *
+ */
+public interface EdgeManagerPluginContext {
+
+ /**
+ * Returns the payload specified by the user for the edge.
+ * @return the payload specified by the user
+ */
+ public byte[] getUserPayload();
+
+ /**
+ * Returns the source vertex name
+ * @return the source vertex name
+ */
+ public String getSourceVertexName();
+
+ /**
+ * Returns the destination vertex name
+ * @return the destination vertex name
+ */
+ public String getDestinationVertexName();
+
+ /**
+ * Returns the number of tasks in the source vertex
+ */
+ public int getSourceVertexNumTasks();
+
+ /**
+ * Returns the number of tasks in the destination vertex
+ */
+ public int getDestinationVertexNumTasks();
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginDescriptor.java
new file mode 100644
index 0000000..083326f
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginDescriptor.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+public class EdgeManagerPluginDescriptor extends TezEntityDescriptor {
+
+ public EdgeManagerPluginDescriptor(String edgeManagerPluginClassName) {
+ super(edgeManagerPluginClassName);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
index 6f0c66d..6ed3c92 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
@@ -96,7 +96,7 @@ public class EdgeProperty {
final SchedulingType schedulingType;
final InputDescriptor inputDescriptor;
final OutputDescriptor outputDescriptor;
- final EdgeManagerDescriptor edgeManagerDescriptor;
+ final EdgeManagerPluginDescriptor edgeManagerDescriptor;
/**
* @param dataMovementType
@@ -135,7 +135,7 @@ public class EdgeProperty {
* @param edgeDestination
* The {@link InputDescriptor} which will consume data from the edge.
*/
- public EdgeProperty(EdgeManagerDescriptor edgeManagerDescriptor,
+ public EdgeProperty(EdgeManagerPluginDescriptor edgeManagerDescriptor,
DataSourceType dataSourceType,
SchedulingType schedulingType,
OutputDescriptor edgeSource,
@@ -176,10 +176,10 @@ public class EdgeProperty {
/**
* Returns the Edge Manager specifications for this edge.
- * @return @link {@link EdgeManagerDescriptor} if a custom edge was setup, null otherwise.
+ * @return @link {@link EdgeManagerPluginDescriptor} if a custom edge was setup, null otherwise.
*/
@Private
- public EdgeManagerDescriptor getEdgeManagerDescriptor() {
+ public EdgeManagerPluginDescriptor getEdgeManagerDescriptor() {
return edgeManagerDescriptor;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java
index 6aec78e..40e2cc0 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java
@@ -37,13 +37,13 @@ public abstract class VertexManagerPlugin {
private final VertexManagerPluginContext context;
/**
- * Crete an instance of the VertexManagerPlugin. Classes extending this one to create a
- * VertexManagerPlugin, must provide the same constructor so that Tez can create an instance of
- * the class at runtime.
- *
- * @param context vertex manager plugin context which can be used to access the payload,
- * vertex
- * properties, etc
+ * Crete an instance of the VertexManagerPlugin. Classes extending this to
+ * create a VertexManagerPlugin, must provide the same constructor so that Tez
+ * can create an instance of the class at runtime.
+ *
+ * @param context
+ * vertex manager plugin context which can be used to access the
+ * payload, vertex properties, etc
*/
public VertexManagerPlugin(VertexManagerPluginContext context) {
this.context = context;
http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
index 7c48adc..8a29691 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java
@@ -30,8 +30,8 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
-import org.apache.tez.runtime.api.RootInputSpecUpdate;
-import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
+import org.apache.tez.runtime.api.InputSpecUpdate;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import com.google.common.base.Preconditions;
@@ -127,8 +127,8 @@ public interface VertexManagerPluginContext {
*/
public boolean setVertexParallelism(int parallelism,
@Nullable VertexLocationHint locationHint,
- @Nullable Map sourceEdgeManagers,
- @Nullable Map rootInputSpecUpdate);
+ @Nullable Map sourceEdgeManagers,
+ @Nullable Map rootInputSpecUpdate);
/**
* Allows a VertexManagerPlugin to assign Events for Root Inputs
@@ -143,7 +143,7 @@ public interface VertexManagerPluginContext {
* the Vertex. The target index on individual events represents the
* task to which events need to be sent.
*/
- public void addRootInputEvents(String inputName, Collection events);
+ public void addRootInputEvents(String inputName, Collection events);
/**
* Notify the vertex to start the given tasks
http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java
index 173952e..0af498d 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java
@@ -27,24 +27,24 @@ import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Public
public abstract class AbstractLogicalIOProcessor implements LogicalIOProcessor,
LogicalIOProcessorFrameworkInterface {
- private final TezProcessorContext context;
+ private final ProcessorContext context;
/**
* Constructor an instance of the LogicalProcessor. Classes extending this one to create a
* LogicalProcessor, must provide the same constructor so that Tez can create an instance of the
* class at runtime.
*
- * @param context the {@link org.apache.tez.runtime.api.TezProcessorContext} which provides
+ * @param context the {@link org.apache.tez.runtime.api.ProcessorContext} which provides
* the Processor with context information within the running task.
*/
- public AbstractLogicalIOProcessor(TezProcessorContext context) {
+ public AbstractLogicalIOProcessor(ProcessorContext context) {
this.context = context;
}
@Override
public abstract void initialize() throws Exception;
- public final TezProcessorContext getContext() {
+ public final ProcessorContext getContext() {
return context;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
index 8b6edda..d1facc7 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
@@ -29,7 +29,7 @@ import java.util.List;
*
* Input classes must provide a 2 argument public constructor for Tez to create the
* Input. The parameters to this constructor are 1) an instance of
- * {@link org.apache.tez.runtime.api.TezInputContext} and 2) an integer which is used to
+ * {@link org.apache.tez.runtime.api.InputContext} and 2) an integer which is used to
* setup the number of physical inputs that the logical input will see.
* Tez will take care of initializing and closing the Input after a {@link Processor} completes.
*
@@ -38,19 +38,19 @@ import java.util.List;
public abstract class AbstractLogicalInput implements LogicalInput, LogicalInputFrameworkInterface {
private final int numPhysicalInputs;
- private final TezInputContext inputContext;
+ private final InputContext inputContext;
/**
* Constructor an instance of the LogicalInput. Classes extending this one to create a
* LogicalInput, must provide the same constructor so that Tez can create an instance of the
* class at runtime.
*
- * @param inputContext the {@link org.apache.tez.runtime.api.TezInputContext} which provides
+ * @param inputContext the {@link org.apache.tez.runtime.api.InputContext} which provides
* the Input with context information within the running task.
* @param numPhysicalInputs the number of physical inputs that the logical input will
* receive. This is typically determined by Edge Routing.
*/
- public AbstractLogicalInput(TezInputContext inputContext, int numPhysicalInputs) {
+ public AbstractLogicalInput(InputContext inputContext, int numPhysicalInputs) {
this.inputContext = inputContext;
this.numPhysicalInputs = numPhysicalInputs;
}
@@ -69,12 +69,12 @@ public abstract class AbstractLogicalInput implements LogicalInput, LogicalInput
}
/**
- * Return ahe {@link org.apache.tez.runtime.api.TezInputContext} for this specific instance of
+ * Return ahe {@link org.apache.tez.runtime.api.InputContext} for this specific instance of
* the LogicalInput
*
- * @return the {@link org.apache.tez.runtime.api.TezInputContext} for the input
+ * @return the {@link org.apache.tez.runtime.api.InputContext} for the input
*/
- public final TezInputContext getContext() {
+ public final InputContext getContext() {
return inputContext;
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java
index d88e57f..317390a 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java
@@ -31,20 +31,20 @@ import java.util.List;
public abstract class AbstractLogicalOutput implements LogicalOutput, LogicalOutputFrameworkInterface {
private final int numPhysicalOutputs;
- private final TezOutputContext outputContext;
+ private final OutputContext outputContext;
/**
* Constructor an instance of the LogicalOutput. Classes extending this one to create a
* LogicalOutput, must provide the same constructor so that Tez can create an instance of the
* class at runtime.
*
- * @param outputContext the {@link org.apache.tez.runtime.api.TezOutputContext} which
+ * @param outputContext the {@link org.apache.tez.runtime.api.OutputContext} which
* provides
* the Output with context information within the running task.
* @param numPhysicalOutputs the number of physical outputs that the logical output will
* generate. This is typically determined by Edge Routing.
*/
- public AbstractLogicalOutput(TezOutputContext outputContext, int numPhysicalOutputs) {
+ public AbstractLogicalOutput(OutputContext outputContext, int numPhysicalOutputs) {
this.outputContext = outputContext;
this.numPhysicalOutputs = numPhysicalOutputs;
}
@@ -62,12 +62,12 @@ public abstract class AbstractLogicalOutput implements LogicalOutput, LogicalOut
}
/**
- * Return ahe {@link org.apache.tez.runtime.api.TezOutputContext} for this specific instance of
+ * Return ahe {@link org.apache.tez.runtime.api.OutputContext} for this specific instance of
* the LogicalOutput
*
- * @return the {@link org.apache.tez.runtime.api.TezOutputContext} for the output
+ * @return the {@link org.apache.tez.runtime.api.OutputContext} for the output
*/
- public final TezOutputContext getContext() {
+ public final OutputContext getContext() {
return outputContext;
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/InputContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputContext.java
new file mode 100644
index 0000000..1961184
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputContext.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+/**
+ * Context handle for the Input to initialize itself.
+ * This interface is not supposed to be implemented by users
+ */
+public interface InputContext extends TaskContext {
+
+ /**
+ * Get the Vertex Name of the Source that generated data for this Input
+ * @return Name of the Source Vertex
+ */
+ public String getSourceVertexName();
+
+ /**
+ * Get the index of the input in the set of all inputs for the task. The
+ * index will be consistent and valid only among the tasks of this vertex.
+ * @return index
+ */
+ public int getInputIndex();
+
+ /**
+ * Inform the framework that the specific Input is ready for consumption.
+ *
+ * This method can be invoked multiple times.
+ */
+ public void inputIsReady();
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/InputFrameworkInterface.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputFrameworkInterface.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputFrameworkInterface.java
index 5192252..25b37b8 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/InputFrameworkInterface.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputFrameworkInterface.java
@@ -31,13 +31,13 @@ import java.util.List;
*
*
* During initialization, Inputs must specify an initial memory requirement via
- * {@link TezInputContext}.requestInitialMemory
+ * {@link InputContext}.requestInitialMemory
*
*
*
* Inputs must also inform the framework once they are ready to be consumed.
* This typically means that the Processor will not block when reading from the
- * corresponding Input. This is done via {@link TezInputContext}.inputIsReady.
+ * corresponding Input. This is done via {@link InputContext}.inputIsReady.
* Inputs choose the policy on when they are ready.
*/
public interface InputFrameworkInterface {
http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
new file mode 100644
index 0000000..a86dc22
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.tez.runtime.api.events.InputInitializerEvent;
+
+/**
+ * InputInitializers are typically used to initialize vertices
+ * connected to data sources. They run in the App Master and can be used to
+ * distribute data across the tasks for the vertex, determine the number of
+ * tasks at runtime, update the Input payload etc.
+ */
+@InterfaceStability.Unstable
+public abstract class InputInitializer {
+
+ private final InputInitializerContext initializerContext;
+
+ /**
+ * Constructor an instance of the InputInitializer. Classes extending this to create a
+ * InputInitializer, must provide the same constructor so that Tez can create an instance of
+ * the class at runtime.
+ *
+ * @param initializerContext initializer context which can be used to access the payload, vertex
+ * properties, etc
+ */
+ public InputInitializer(InputInitializerContext initializerContext) {
+ this.initializerContext = initializerContext;
+ }
+
+ /**
+ * Run the initializer. This is the main method where
+ * initialization takes place. If an Initializer is written to accept events,
+ * a notification mechanism should be setup, with the heavy lifting of
+ * processing the event being done via this method. The moment this method
+ * returns a list of events, input initialization is considered to be
+ * complete.
+ *
+ * @return a list of events which are eventually routed to a
+ * {@link org.apache.tez.dag.api.VertexManagerPlugin} for routing
+ * @throws Exception
+ */
+ public abstract List initialize()
+ throws Exception;
+
+ /**
+ * Handle events meant for the specific Initializer. This is a notification mechanism to inform
+ * the Initializer about events received. Extensive event processing should not be performed via
+ * this method call. Instead this should just be used as a notification method to the main
+ * initialization via the initialize method.
+ *
+ * @param events list of events
+ * @throws Exception
+ */
+ public abstract void handleInputInitializerEvent(List events)
+ throws Exception;
+
+ /**
+ * Return ahe {@link org.apache.tez.runtime.api.InputInitializerContext}
+ * for this specific instance of the Initializer.
+ *
+ * @return the {@link org.apache.tez.runtime.api.InputInitializerContext}
+ * for the initializer
+ */
+ public final InputInitializerContext getContext() {
+ return this.initializerContext;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java
new file mode 100644
index 0000000..67b6732
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+/**
+ * A context that provides information to the {@link InputInitializer}
+ */
+@Unstable
+public interface InputInitializerContext {
+
+ /**
+ * Get the YARN application id given to the Tez Application Master
+ * @return Application id
+ */
+ ApplicationId getApplicationId();
+
+ /**
+ * Get the name of the DAG
+ * @return DAG name
+ */
+ String getDAGName();
+
+ /**
+ * Get the name of the input
+ * @return Input name
+ */
+ String getInputName();
+
+ /**
+ * Get the user payload for the input
+ * @return User payload
+ */
+ @Nullable byte[] getInputUserPayload();
+
+ /**
+ * Get the user payload for the initializer
+ * @return User payload
+ */
+ @Nullable byte[] getUserPayload();
+
+ /**
+ * Get the number of tasks in this vertex. Maybe -1 if the vertex has not been
+ * initialized with a pre-determined number of tasks.
+ * @return number of tasks
+ */
+ int getNumTasks();
+
+ /**
+ * Get the resource allocated to a task of this vertex
+ * @return Resource
+ */
+ Resource getVertexTaskResource();
+
+ /**
+ * Get the total resource allocated to this vertex. If the DAG is running in
+ * a busy cluster then it may have no resources available dedicated to it. The
+ * DAG may divide its resources among member vertices.
+ * @return Resource
+ */
+ Resource getTotalAvailableResource();
+
+ /**
+ * Get the number of nodes in the cluster
+ * @return Number of nodes
+ */
+ int getNumClusterNodes();
+
+ /**
+ * @return DAG Attempt number
+ */
+ int getDAGAttemptNumber();
+
+ /**
+ * Get the number of tasks in the given vertex
+ * @param vertexName
+ * @return Total number of tasks in this vertex
+ */
+ public int getVertexNumTasks(String vertexName);
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/InputSpecUpdate.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputSpecUpdate.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputSpecUpdate.java
new file mode 100644
index 0000000..0b86ae9
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputSpecUpdate.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Update Input specs for Inputs running in a task. Allows setting the number of physical
+ * inputs for all work units if they have the same number of physical inputs, or individual
+ * numPhysicalInputs for each work unit.
+ *
+ */
+@Unstable
+public class InputSpecUpdate {
+
+ private final boolean forAllWorkUnits;
+ private final List numPhysicalInputs;
+
+ private final static InputSpecUpdate DEFAULT_SINGLE_PHYSICAL_INPUT_SPEC = createAllTaskInputSpecUpdate(1);
+
+ /**
+ * Create an update instance where all work units (typically represented by
+ * {@link InputDataInformationEvent}) will have the same number of physical inputs.
+ *
+ * @param numPhysicalInputs
+ * the number of physical inputs for all work units which will use the LogicalInput
+ * @return
+ */
+ public static InputSpecUpdate createAllTaskInputSpecUpdate(int numPhysicalInputs) {
+ return new InputSpecUpdate(numPhysicalInputs);
+ }
+
+ /**
+ * Create an update instance where all work units (typically represented by
+ * {@link InputDataInformationEvent}) will have the same number of physical inputs.
+ *
+ * @param perWorkUnitNumPhysicalInputs
+ * A list containing one entry per work unit. The order in the list corresponds to task
+ * index or equivalently the order of {@link InputDataInformationEvent}s being sent.
+ * @return
+ */
+ public static InputSpecUpdate createPerTaskInputSpecUpdate(
+ List perWorkUnitNumPhysicalInputs) {
+ return new InputSpecUpdate(perWorkUnitNumPhysicalInputs);
+ }
+
+ public static InputSpecUpdate getDefaultSinglePhysicalInputSpecUpdate() {
+ return DEFAULT_SINGLE_PHYSICAL_INPUT_SPEC;
+ }
+
+ private InputSpecUpdate(int numPhysicalInputs) {
+ this.forAllWorkUnits = true;
+ this.numPhysicalInputs = Lists.newArrayList(numPhysicalInputs);
+ }
+
+ private InputSpecUpdate(List perWorkUnitNumPhysicalInputs) {
+ this.forAllWorkUnits = false;
+ this.numPhysicalInputs = Lists.newArrayList(perWorkUnitNumPhysicalInputs);
+ }
+
+ @Private
+ public int getNumPhysicalInputsForWorkUnit(int index) {
+ if (this.forAllWorkUnits) {
+ return numPhysicalInputs.get(0);
+ } else {
+ return numPhysicalInputs.get(index);
+ }
+ }
+
+ @Private
+ /* Used for recovery serialization */
+ public boolean isForAllWorkUnits() {
+ return this.forAllWorkUnits;
+ }
+
+ @Private
+ /* Used for recovery serialization */
+ public List getAllNumPhysicalInputs() {
+ return numPhysicalInputs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/MergedInputContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedInputContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedInputContext.java
new file mode 100644
index 0000000..0d8ea61
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedInputContext.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+import javax.annotation.Nullable;
+
+/**
+ * Context for {@link MergedLogicalInput}
+ * This interface is not supposed to be implemented by users
+ */
+public interface MergedInputContext {
+
+ @Nullable
+ public byte[] getUserPayload();
+
+ /**
+ * Inform the framework that the specific Input is ready for consumption.
+ *
+ * This method can be invoked multiple times.
+ */
+ public void inputIsReady();
+
+ /**
+ * Get the work directories for the Input
+ * @return an array of work dirs
+ */
+ public String[] getWorkDirs();
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
index 656bb2c..fd698bd 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
@@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*
* MergedLogicalInput implementations must provide a 2 argument public constructor for
* Tez to create the Input. The parameters to this constructor are 1) an instance of {@link
- * org.apache.tez.runtime.api.TezMergedInputContext} and 2) a list of constituent inputs. Tez will
+ * org.apache.tez.runtime.api.MergedInputContext} and 2) a list of constituent inputs. Tez will
* take care of initializing and closing the Input after a {@link Processor} completes.
*
*/
@@ -40,7 +40,7 @@ public abstract class MergedLogicalInput implements LogicalInput {
private AtomicBoolean notifiedInputReady = new AtomicBoolean(false);
private List inputs;
private final AtomicBoolean isStarted = new AtomicBoolean(false);
- private final TezMergedInputContext context;
+ private final MergedInputContext context;
/**
* Constructor an instance of the MergedLogicalInputs. Classes extending this one to create a
@@ -48,11 +48,11 @@ public abstract class MergedLogicalInput implements LogicalInput {
* the
* class at runtime.
*
- * @param context the {@link org.apache.tez.runtime.api.TezMergedInputContext} which provides
+ * @param context the {@link org.apache.tez.runtime.api.MergedInputContext} which provides
* the Input with context information within the running task.
* @param inputs the list of constituen Inputs.
*/
- public MergedLogicalInput(TezMergedInputContext context, List inputs) {
+ public MergedLogicalInput(MergedInputContext context, List inputs) {
this.inputs = Collections.unmodifiableList(inputs);
this.context = context;
}
@@ -61,7 +61,7 @@ public abstract class MergedLogicalInput implements LogicalInput {
return inputs;
}
- public final TezMergedInputContext getContext() {
+ public final MergedInputContext getContext() {
return context;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java
index 7e21345..0b40695 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java
@@ -28,10 +28,10 @@ import org.apache.tez.dag.api.client.VertexStatus;
@InterfaceStability.Unstable
public abstract class OutputCommitter {
- private final OutputCommitterContext outputCommitterContext;
+ private final OutputCommitterContext committerContext;
/**
- * Constructor an instance of the OutputCommitter. Classes extending this one to create an
+ * Constructor an instance of the OutputCommitter. Classes extending this to create a
* OutputCommitter, must provide the same constructor so that Tez can create an instance of
* the class at runtime.
*
@@ -39,7 +39,7 @@ public abstract class OutputCommitter {
* properties, etc
*/
public OutputCommitter(OutputCommitterContext committerContext) {
- this.outputCommitterContext = committerContext;
+ this.committerContext = committerContext;
}
/**
@@ -108,7 +108,7 @@ public abstract class OutputCommitter {
* @return the {@link org.apache.tez.runtime.api.OutputCommitterContext} for the input
*/
public final OutputCommitterContext getContext() {
- return this.outputCommitterContext;
+ return this.committerContext;
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java
index 7189c2e..05c2fd7 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java
@@ -20,15 +20,15 @@ package org.apache.tez.runtime.api;
import javax.annotation.Nullable;
-import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
/**
* Context through which the OutputCommitter can access all the relevant
- * information that it needs.
+ * information that it needs. This interface is not supposed to be implemented
+ * by users
*/
-
+@Unstable
public interface OutputCommitterContext {
/**
@@ -77,8 +77,6 @@ public interface OutputCommitterContext {
* Get Vertex Index in the DAG
* @return Vertex index
*/
- @Unstable
- @Evolving
public int getVertexIndex();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java
new file mode 100644
index 0000000..d5796e2
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+/**
+ * Context handle for the Output to initialize itself.
+ * This interface is not supposed to be implemented by users
+ */
+public interface OutputContext extends TaskContext {
+
+ /**
+ * Get the Vertex Name of the Destination that is the recipient of this
+ * Output's data
+ * @return Name of the Destination Vertex
+ */
+ public String getDestinationVertexName();
+
+ /**
+ * Get the index of the output in the set of all outputs for the task. The
+ * index will be consistent and valid only among the tasks of this vertex.
+ * @return index
+ */
+ public int getOutputIndex();
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/OutputFrameworkInterface.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputFrameworkInterface.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputFrameworkInterface.java
index 873fa0c..7f64f3e 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputFrameworkInterface.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputFrameworkInterface.java
@@ -30,7 +30,7 @@ import java.util.List;
*
*
* During initialization, Outputs must specify an initial memory requirement via
- * {@link TezOutputContext}.requestInitialMemory
+ * {@link OutputContext}.requestInitialMemory
*
*
*/
http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java
new file mode 100644
index 0000000..6a28e1d
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Context handle for the Processor to initialize itself.
+ * This interface is not supposed to be implemented by users
+ */
+public interface ProcessorContext extends TaskContext {
+
+ /**
+ * Set the overall progress of this Task Attempt
+ * @param progress Progress in the range from [0.0 - 1.0f]
+ */
+ public void setProgress(float progress);
+
+ /**
+ * Check whether this attempt can commit its output
+ * @return true if commit allowed
+ * @throws IOException
+ */
+ public boolean canCommit() throws IOException;
+
+ /**
+ * Blocking call which returns when any of the specified Inputs is ready for
+ * consumption.
+ *
+ * There can be multiple parallel invocations of this function - where each
+ * invocation blocks on the Inputs that it specifies.
+ *
+ * If multiple Inputs are ready, any one of them may be returned by this
+ * method - including an Input which may have been returned in a previous
+ * call. If invoking this method multiple times, it's recommended to remove
+ * previously completed Inputs from the invocation list.
+ *
+ * @param inputs
+ * the list of Inputs to monitor
+ * @return the Input which is ready for consumption
+ * @throws InterruptedException
+ */
+ public Input waitForAnyInputReady(Collection inputs) throws InterruptedException;
+
+ /**
+ * Blocking call which returns only after all of the specified Inputs are
+ * ready for consumption.
+ *
+ * There can be multiple parallel invocations of this function - where each
+ * invocation blocks on the Inputs that it specifies.
+ *
+ * @param inputs
+ * the list of Inputs to monitor
+ * @throws InterruptedException
+ */
+ public void waitForAllInputsReady(Collection inputs) throws InterruptedException;
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/RootInputSpecUpdate.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/RootInputSpecUpdate.java b/tez-api/src/main/java/org/apache/tez/runtime/api/RootInputSpecUpdate.java
deleted file mode 100644
index 72adf78..0000000
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/RootInputSpecUpdate.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.api;
-
-import java.util.List;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
-
-import com.google.common.collect.Lists;
-
-/**
- * Update Input specs for Root Inputs running in a task. Allows setting the number of physical
- * inputs for all work units if they have the same number of physical inputs, or individual
- * numPhysicalInputs for each work unit.
- *
- */
-public class RootInputSpecUpdate {
-
- private final boolean forAllWorkUnits;
- private final List numPhysicalInputs;
-
- private final static RootInputSpecUpdate DEFAULT_SINGLE_PHYSICAL_INPUT_SPEC = createAllTaskRootInputSpecUpdate(1);
-
- /**
- * Create an update instance where all work units (typically represented by
- * {@link RootInputDataInformationEvent}) will have the same number of physical inputs.
- *
- * @param numPhysicalInputs
- * the number of physical inputs for all work units which will use the LogicalInput
- * @return
- */
- public static RootInputSpecUpdate createAllTaskRootInputSpecUpdate(int numPhysicalInputs) {
- return new RootInputSpecUpdate(numPhysicalInputs);
- }
-
- /**
- * Create an update instance where all work units (typically represented by
- * {@link RootInputDataInformationEvent}) will have the same number of physical inputs.
- *
- * @param perWorkUnitNumPhysicalInputs
- * A list containing one entry per work unit. The order in the list corresponds to task
- * index or equivalently the order of RootInputDataInformationEvents being sent.
- * @return
- */
- public static RootInputSpecUpdate createPerTaskRootInputSpecUpdate(
- List perWorkUnitNumPhysicalInputs) {
- return new RootInputSpecUpdate(perWorkUnitNumPhysicalInputs);
- }
-
- public static RootInputSpecUpdate getDefaultSinglePhysicalInputSpecUpdate() {
- return DEFAULT_SINGLE_PHYSICAL_INPUT_SPEC;
- }
-
- private RootInputSpecUpdate(int numPhysicalInputs) {
- this.forAllWorkUnits = true;
- this.numPhysicalInputs = Lists.newArrayList(numPhysicalInputs);
- }
-
- private RootInputSpecUpdate(List perWorkUnitNumPhysicalInputs) {
- this.forAllWorkUnits = false;
- this.numPhysicalInputs = Lists.newArrayList(perWorkUnitNumPhysicalInputs);
- }
-
- @Private
- public int getNumPhysicalInputsForWorkUnit(int index) {
- if (this.forAllWorkUnits) {
- return numPhysicalInputs.get(0);
- } else {
- return numPhysicalInputs.get(index);
- }
- }
-
- @Private
- /* Used for recovery serialization */
- public boolean isForAllWorkUnits() {
- return this.forAllWorkUnits;
- }
-
- @Private
- /* Used for recovery serialization */
- public List getAllNumPhysicalInputs() {
- return numPhysicalInputs;
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
new file mode 100644
index 0000000..56c5cfd
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
+
+/**
+ * Base interface for Context classes used to initialize the Input, Output
+ * and Processor instances.
+ * This interface is not supposed to be implemented by users
+ */
+public interface TaskContext {
+ // Scale the maximum events we fetch per RPC call to mitigate OOM issues
+ // on the ApplicationMaster when a thundering herd of reducers fetch events
+ // This should not be necessary after HADOOP-8942. TEZ-1398
+
+ /**
+ * Get the {@link ApplicationId} for the running app
+ * @return the {@link ApplicationId}
+ */
+ public ApplicationId getApplicationId();
+
+ /**
+ * Get the current DAG Attempt Number
+ * @return DAG Attempt Number
+ */
+ public int getDAGAttemptNumber();
+
+ /**
+ * Get the index of this Task among the tasks of this vertex
+ * @return Task Index
+ */
+ public int getTaskIndex();
+
+ /**
+ * Get the current Task Attempt Number
+ * @return Task Attempt Number
+ */
+ public int getTaskAttemptNumber();
+
+ /**
+ * Get the name of the DAG
+ * @return the DAG name
+ */
+ public String getDAGName();
+
+ /**
+ * Get the name of the Vertex in which the task is running
+ * @return Vertex Name
+ */
+ public String getTaskVertexName();
+
+ /**
+ * Get the index of this task's vertex in the set of vertices in the DAG. This
+ * is consistent and valid across all tasks/vertices in the same DAG.
+ * @return index
+ */
+ public int getTaskVertexIndex();
+
+ public TezCounters getCounters();
+
+ /**
+ * Send Events to the AM and/or dependent Vertices
+ * @param events Events to be sent
+ */
+ public void sendEvents(List events);
+
+ /**
+ * Get the User Payload for the Input/Output/Processor
+ * @return User Payload
+ */
+ @Nullable
+ public byte[] getUserPayload();
+
+ /**
+ * Get the work directories for the Input/Output/Processor
+ * @return an array of work dirs
+ */
+ public String[] getWorkDirs();
+
+ /**
+ * Returns an identifier which is unique to the specific Input, Processor or
+ * Output
+ *
+ * @return a unique identifier
+ */
+ public String getUniqueIdentifier();
+
+ /**
+ * Returns a shared {@link ObjectRegistry} to hold user objects in memory
+ * between tasks.
+ * @return {@link ObjectRegistry}
+ */
+ public ObjectRegistry getObjectRegistry();
+
+ /**
+ * Report a fatal error to the framework. This will cause the entire task to
+ * fail and should not be used for reporting temporary or recoverable errors
+ *
+ * @param exception an exception representing the error
+ */
+ public void fatalError(@Nullable Throwable exception, @Nullable String message);
+
+ /**
+ * Returns meta-data for the specified service. As an example, when the MR
+ * ShuffleHandler is used - this would return the jobToken serialized as bytes
+ *
+ * @param serviceName
+ * the name of the service for which meta-data is required
+ * @return a ByteBuffer representing the meta-data
+ */
+ public ByteBuffer getServiceConsumerMetaData(String serviceName);
+
+ /**
+ * Return Provider meta-data for the specified service As an example, when the
+ * MR ShuffleHandler is used - this would return the shuffle port serialized
+ * as bytes
+ *
+ * @param serviceName
+ * the name of the service for which provider meta-data is required
+ * @return a ByteBuffer representing the meta-data
+ */
+ @Nullable
+ public ByteBuffer getServiceProviderMetaData(String serviceName);
+
+ /**
+ * Request a specific amount of memory during initialization
+ * (initialize(..*Context)) The requester is notified of allocation via the
+ * provided callback handler.
+ *
+ * Currently, (post TEZ-668) the caller will be informed about the available
+ * memory after initialization (I/P/O initialize(...)), and before the
+ * start/run invocation. There will be no other invocations on the callback.
+ *
+ * This method can be called only once by any component. Calling it multiple
+ * times from within the same component will result in an error.
+ *
+ * Each Input / Output must request memory. For Inputs / Outputs which do not
+ * have a specific ask, a null callback handler can be specified with a
+ * request size of 0.
+ *
+ * @param size
+ * request size in bytes.
+ * @param callbackHandler
+ * the callback handler to be invoked once memory is assigned
+ */
+ public void requestInitialMemory(long size, MemoryUpdateCallback callbackHandler);
+
+ /**
+ * Gets the total memory available to all components of the running task. This
+ * values will always be constant, and does not factor in any allocations.
+ *
+ * @return the total available memory for all components of the task
+ */
+ public long getTotalMemoryAvailableToTask();
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java
deleted file mode 100644
index 1f40ed9..0000000
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.api;
-
-/**
- * Context handle for the Input to initialize itself.
- */
-public interface TezInputContext extends TezTaskContext {
-
- /**
- * Get the Vertex Name of the Source that generated data for this Input
- * @return Name of the Source Vertex
- */
- public String getSourceVertexName();
-
- /**
- * Get the index of the input in the set of all inputs for the task. The
- * index will be consistent and valid only among the tasks of this vertex.
- * @return index
- */
- public int getInputIndex();
-
- /**
- * Inform the framework that the specific Input is ready for consumption.
- *
- * This method can be invoked multiple times.
- */
- public void inputIsReady();
-}