Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1AFD011FB8 for ; Mon, 11 Aug 2014 17:59:08 +0000 (UTC) Received: (qmail 30590 invoked by uid 500); 11 Aug 2014 17:59:08 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 30552 invoked by uid 500); 11 Aug 2014 17:59:07 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 30543 invoked by uid 99); 11 Aug 2014 17:59:07 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Aug 2014 17:59:07 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 77C4D9AB928; Mon, 11 Aug 2014 17:59:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bikas@apache.org To: commits@tez.apache.org Date: Mon, 11 Aug 2014 17:59:10 -0000 Message-Id: <191864b9dc53420f911504da86d26532@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [5/5] git commit: TEZ-1237. Consolidate naming of API classes (bikas) TEZ-1237. Consolidate naming of API classes (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/27e5b6c6 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/27e5b6c6 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/27e5b6c6 Branch: refs/heads/master Commit: 27e5b6c686870a72d30c1613376ea908761ea40f Parents: c171220 Author: Bikas Saha Authored: Mon Aug 11 10:58:55 2014 -0700 Committer: Bikas Saha Committed: Mon Aug 11 10:58:55 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/dag/api/DagTypeConverters.java | 4 +- .../tez/dag/api/DataSourceDescriptor.java | 12 +- .../org/apache/tez/dag/api/EdgeManager.java | 148 --------------- .../apache/tez/dag/api/EdgeManagerContext.java | 55 ------ .../tez/dag/api/EdgeManagerDescriptor.java | 27 --- .../apache/tez/dag/api/EdgeManagerPlugin.java | 147 +++++++++++++++ .../tez/dag/api/EdgeManagerPluginContext.java | 56 ++++++ .../dag/api/EdgeManagerPluginDescriptor.java | 27 +++ .../org/apache/tez/dag/api/EdgeProperty.java | 8 +- .../apache/tez/dag/api/VertexManagerPlugin.java | 14 +- .../tez/dag/api/VertexManagerPluginContext.java | 10 +- .../runtime/api/AbstractLogicalIOProcessor.java | 8 +- .../tez/runtime/api/AbstractLogicalInput.java | 14 +- .../tez/runtime/api/AbstractLogicalOutput.java | 12 +- .../apache/tez/runtime/api/InputContext.java | 46 +++++ .../runtime/api/InputFrameworkInterface.java | 4 +- .../tez/runtime/api/InputInitializer.java | 87 +++++++++ .../runtime/api/InputInitializerContext.java | 102 +++++++++++ .../apache/tez/runtime/api/InputSpecUpdate.java | 103 +++++++++++ .../tez/runtime/api/MergedInputContext.java | 44 +++++ .../tez/runtime/api/MergedLogicalInput.java | 10 +- .../apache/tez/runtime/api/OutputCommitter.java | 8 +- .../tez/runtime/api/OutputCommitterContext.java | 8 +- .../apache/tez/runtime/api/OutputContext.java | 41 +++++ .../runtime/api/OutputFrameworkInterface.java | 2 +- .../tez/runtime/api/ProcessorContext.java | 74 ++++++++ .../tez/runtime/api/RootInputSpecUpdate.java | 101 ----------- .../org/apache/tez/runtime/api/TaskContext.java | 180 ++++++++++++++++++ .../apache/tez/runtime/api/TezInputContext.java | 45 ----- .../tez/runtime/api/TezMergedInputContext.java | 40 ---- .../tez/runtime/api/TezOutputContext.java | 40 ---- .../tez/runtime/api/TezProcessorContext.java | 74 -------- .../runtime/api/TezRootInputInitializer.java | 83 --------- .../api/TezRootInputInitializerContext.java | 97 ---------- .../apache/tez/runtime/api/TezTaskContext.java | 181 ------------------- .../events/InputConfigureVertexTasksEvent.java | 54 ++++++ .../api/events/InputDataInformationEvent.java | 94 ++++++++++ .../api/events/InputInitializerEvent.java | 86 +++++++++ .../api/events/InputUpdatePayloadEvent.java | 46 +++++ .../RootInputConfigureVertexTasksEvent.java | 52 ------ .../events/RootInputDataInformationEvent.java | 92 ---------- .../api/events/RootInputInitializerEvent.java | 83 --------- .../api/events/RootInputUpdatePayloadEvent.java | 44 ----- .../runtime/api/events/VertexManagerEvent.java | 2 + .../org/apache/tez/dag/api/TestDAGPlan.java | 4 +- .../org/apache/tez/dag/api/TestDAGVerify.java | 2 +- .../app/dag/RootInputInitializerManager.java | 28 +-- .../java/org/apache/tez/dag/app/dag/Vertex.java | 8 +- .../dag/app/dag/impl/BroadcastEdgeManager.java | 8 +- .../org/apache/tez/dag/app/dag/impl/Edge.java | 18 +- .../dag/app/dag/impl/OneToOneEdgeManager.java | 8 +- .../app/dag/impl/RootInputVertexManager.java | 28 +-- .../app/dag/impl/ScatterGatherEdgeManager.java | 8 +- .../TezRootInputInitializerContextImpl.java | 4 +- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 40 ++-- .../tez/dag/app/dag/impl/VertexManager.java | 16 +- .../VertexDataMovementEventsGeneratedEvent.java | 4 +- .../events/VertexParallelismUpdatedEvent.java | 34 ++-- .../dag/impl/TestRootInputVertexManager.java | 12 +- .../tez/dag/app/dag/impl/TestVertexImpl.java | 96 +++++----- .../TestHistoryEventsProtoConversion.java | 26 +-- .../tez/runtime/task/TestTaskExecution.java | 4 +- .../org/apache/tez/test/EdgeManagerForTest.java | 10 +- .../apache/tez/examples/OrderedWordCount.java | 6 +- .../examples/BroadcastAndOneToOneExample.java | 6 +- .../mapreduce/examples/FilterLinesByWord.java | 4 +- .../examples/FilterLinesByWordOneToOne.java | 4 +- .../mapreduce/examples/IntersectDataGen.java | 4 +- .../mapreduce/examples/IntersectExample.java | 6 +- .../mapreduce/examples/IntersectValidate.java | 4 +- .../examples/TestOrderedWordCount.java | 4 +- .../tez/mapreduce/examples/UnionExample.java | 6 +- .../tez/mapreduce/examples/WordCount.java | 6 +- .../processor/FilterByWordInputProcessor.java | 4 +- .../processor/FilterByWordOutputProcessor.java | 4 +- .../tez/mapreduce/combine/MRCombiner.java | 16 +- .../common/MRInputAMSplitGenerator.java | 28 +-- .../common/MRInputSplitDistributor.java | 26 +-- .../hadoop/mapreduce/MapContextImpl.java | 4 +- .../mapreduce/TaskInputOutputContextImpl.java | 4 +- .../org/apache/tez/mapreduce/input/MRInput.java | 18 +- .../tez/mapreduce/input/MRInputLegacy.java | 10 +- .../tez/mapreduce/input/MultiMRInput.java | 10 +- .../tez/mapreduce/input/base/MRInputBase.java | 4 +- .../apache/tez/mapreduce/output/MROutput.java | 4 +- .../tez/mapreduce/output/MROutputLegacy.java | 4 +- .../apache/tez/mapreduce/processor/MRTask.java | 8 +- .../tez/mapreduce/processor/MRTaskReporter.java | 18 +- .../mapreduce/processor/SimpleMRProcessor.java | 4 +- .../mapreduce/processor/map/MapProcessor.java | 4 +- .../processor/reduce/ReduceProcessor.java | 4 +- .../common/TestMRInputSplitDistributor.java | 32 ++-- .../tez/mapreduce/input/TestMultiMRInput.java | 24 +-- .../processor/map/TestMapProcessor.java | 4 +- .../org/apache/tez/common/ProtoConverters.java | 18 +- .../runtime/LogicalIOProcessorRuntimeTask.java | 58 +++--- .../apache/tez/runtime/api/impl/TezEvent.java | 12 +- .../runtime/api/impl/TezInputContextImpl.java | 4 +- .../api/impl/TezMergedInputContextImpl.java | 4 +- .../runtime/api/impl/TezOutputContextImpl.java | 4 +- .../api/impl/TezProcessorContextImpl.java | 4 +- .../runtime/api/impl/TezTaskContextImpl.java | 4 +- .../common/resources/MemoryDistributor.java | 30 +-- .../tez/runtime/TestInputReadyTracker.java | 10 +- .../TestLogicalIOProcessorRuntimeTask.java | 12 +- .../common/resources/TestMemoryDistributor.java | 46 ++--- .../vertexmanager/ShuffleVertexManager.java | 18 +- .../broadcast/output/FileBasedKVWriter.java | 4 +- .../runtime/library/common/TezRuntimeUtils.java | 10 +- .../common/localshuffle/LocalShuffle.java | 6 +- .../library/common/shuffle/impl/Fetcher.java | 4 +- .../common/shuffle/impl/MergeManager.java | 6 +- .../library/common/shuffle/impl/Shuffle.java | 6 +- .../shuffle/impl/ShuffleInputEventHandler.java | 6 +- .../common/shuffle/impl/ShuffleScheduler.java | 6 +- .../common/sort/impl/ExternalSorter.java | 6 +- .../common/sort/impl/PipelinedSorter.java | 4 +- .../common/sort/impl/dflt/DefaultSorter.java | 4 +- .../BaseUnorderedPartitionedKVWriter.java | 6 +- .../writers/UnorderedPartitionedKVWriter.java | 4 +- .../OrderedPartitionedKVEdgeConfigurer.java | 4 +- .../UnorderedPartitionedKVEdgeConfigurer.java | 4 +- .../UnorderedUnpartitionedKVEdgeConfigurer.java | 4 +- .../input/ConcatenatedMergedKeyValueInput.java | 4 +- .../input/ConcatenatedMergedKeyValuesInput.java | 4 +- .../runtime/library/input/LocalMergedInput.java | 4 +- .../library/input/ShuffledMergedInput.java | 4 +- .../input/ShuffledMergedInputLegacy.java | 4 +- .../library/input/ShuffledUnorderedKVInput.java | 4 +- .../library/input/SortedGroupedMergedInput.java | 4 +- .../library/output/LocalOnFileSorterOutput.java | 4 +- .../library/output/OnFileSortedOutput.java | 4 +- .../library/output/OnFileUnorderedKVOutput.java | 4 +- .../OnFileUnorderedPartitionedKVOutput.java | 4 +- .../library/processor/PreWarmProcessor.java | 4 +- .../library/processor/SimpleProcessor.java | 4 +- .../library/processor/SleepProcessor.java | 4 +- .../impl/ShuffleInputEventHandlerImpl.java | 4 +- .../shuffle/common/impl/ShuffleManager.java | 6 +- .../vertexmanager/TestShuffleVertexManager.java | 22 +-- .../TestWeightedScalingMemoryDistributor.java | 20 +- .../library/common/TestValuesIterator.java | 8 +- .../impl/TestShuffleInputEventHandler.java | 8 +- .../TestUnorderedPartitionedKVWriter.java | 18 +- .../input/TestSortedGroupedMergedInput.java | 4 +- .../library/output/TestOnFileSortedOutput.java | 8 +- .../output/TestOnFileUnorderedKVOutput.java | 6 +- .../impl/TestShuffleInputEventHandlerImpl.java | 10 +- .../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 10 +- .../java/org/apache/tez/test/TestInput.java | 4 +- .../java/org/apache/tez/test/TestOutput.java | 4 +- .../java/org/apache/tez/test/TestProcessor.java | 6 +- .../apache/tez/test/dag/MultiAttemptDAG.java | 20 +- 154 files changed, 1828 insertions(+), 1802 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5faca2f..99d25c1 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -51,6 +51,7 @@ INCOMPATIBLE CHANGES TEZ-1386. TezGroupedSplitsInputFormat should not need to be setup to enable grouping. TEZ-1394. Create example code for OrderedWordCount TEZ-1372. Fix preWarm to work after recent API changes + TEZ-1237. Consolidate naming of API classes Release 0.4.0-incubating: 2014-04-05 http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java index c81fcb6..b56b2da 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java @@ -361,14 +361,14 @@ public class DagTypeConverters { return new VertexManagerPluginDescriptor(className).setUserPayload(bb); } - public static EdgeManagerDescriptor convertEdgeManagerDescriptorFromDAGPlan( + public static EdgeManagerPluginDescriptor convertEdgeManagerDescriptorFromDAGPlan( TezEntityDescriptorProto proto) { String className = proto.getClassName(); byte[] bb = null; if (proto.hasUserPayload()) { bb = proto.getUserPayload().toByteArray(); } - return new EdgeManagerDescriptor(className).setUserPayload(bb); + return new EdgeManagerPluginDescriptor(className).setUserPayload(bb); } public static ProcessorDescriptor convertProcessorDescriptorFromDAGPlan( http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java index 51d85ae..88304c4 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java @@ -22,8 +22,8 @@ import javax.annotation.Nullable; import org.apache.hadoop.security.Credentials; import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint; -import org.apache.tez.runtime.api.TezRootInputInitializer; -import org.apache.tez.runtime.api.events.RootInputDataInformationEvent; +import org.apache.tez.runtime.api.InputInitializer; +import org.apache.tez.runtime.api.events.InputDataInformationEvent; /** * Defines the input and input initializer for a data source @@ -46,10 +46,10 @@ public class DataSourceDescriptor { * @param inputInitializerDescriptor * An initializer for this Input which may run within the AM. This * can be used to set the parallelism for this vertex and generate - * {@link RootInputDataInformationEvent}s for the actual Input.

+ * {@link InputDataInformationEvent}s for the actual Input.

* If this is not specified, the parallelism must be set for the * vertex. In addition, the Input should know how to access data for - * each of it's tasks.

If a {@link TezRootInputInitializer} is + * each of it's tasks.

If a {@link InputInitializer} is * meant to determine the parallelism of the vertex, the initial * vertex parallelism should be set to -1. Can be null. */ @@ -67,10 +67,10 @@ public class DataSourceDescriptor { * @param inputInitializerDescriptor * An initializer for this Input which may run within the AM. This * can be used to set the parallelism for this vertex and generate - * {@link RootInputDataInformationEvent}s for the actual Input.

+ * {@link InputDataInformationEvent}s for the actual Input.

* If this is not specified, the parallelism must be set for the * vertex. In addition, the Input should know how to access data for - * each of it's tasks.

If a {@link TezRootInputInitializer} is + * each of it's tasks.

If a {@link InputInitializer} is * meant to determine the parallelism of the vertex, the initial * vertex parallelism should be set to -1. Can be null. * @param numShards Number of shards of data http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java deleted file mode 100644 index c447ca5..0000000 --- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManager.java +++ /dev/null @@ -1,148 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.tez.dag.api; - -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.tez.runtime.api.events.DataMovementEvent; -import org.apache.tez.runtime.api.events.InputReadErrorEvent; - -/** - * This interface defines the routing of the event between tasks of producer and - * consumer vertices. The routing is bi-directional. Users can customize the - * routing by providing an implementation of this interface. - * - * Implementations must provide a 0 argument public constructor. - */ -@InterfaceStability.Unstable -public abstract class EdgeManager { - - private final EdgeManagerContext context; - - /** - * Crete an instance of the VertexManagerPlugin. Classes extending this one to create a - * VertexManagerPlugin, must provide the same constructor so that Tez can create an instance of - * the class at runtime. - * - * @param context the context within which this EdgeManager will run. Includes - * information like configuration which the user may have specified - * while setting up the edge. - */ - public EdgeManager(EdgeManagerContext context) { - this.context = context; - } - - /** - * Initializes the EdgeManager. This method is called in the following - * circumstances

1. when initializing an Edge Manager for the first time. - *

2. When an EdgeManager is replaced at runtime. At this point, an - * EdgeManager instance is created and setup by the user. The initialize - * method will be called with the original {@link EdgeManagerContext} when the - * edgeManager is replaced. - * - */ - public abstract void initialize(); - - /** - * Get the number of physical inputs on the destination task - * @param destinationTaskIndex Index of destination task for which number of - * inputs is needed - * @return Number of physical inputs on the destination task - */ - public abstract int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex); - - /** - * Get the number of physical outputs on the source task - * @param sourceTaskIndex Index of the source task for which number of outputs - * is needed - * @return Number of physical outputs on the source task - */ - public abstract int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex); - - /** - * Return the routing information to inform consumers about the source task - * output that is now available. The return map has the routing information. - * The event will be routed to every destination task index in the key of the - * map. Every physical input in the value for that task key will receive the - * input. - * - * @param event - * Data movement event that contains the output information - * @param sourceTaskIndex - * Source task that produced the event - * @param sourceOutputIndex - * Index of the physical output on the source task that produced the - * event - * @param destinationTaskAndInputIndices - * Map via which the routing information is returned - */ - public abstract void routeDataMovementEventToDestination(DataMovementEvent event, - int sourceTaskIndex, int sourceOutputIndex, - Map> destinationTaskAndInputIndices); - - /** - * Return the routing information to inform consumers about the failure of a - * source task whose outputs have been potentially lost. The return map has - * the routing information. The failure notification event will be sent to - * every task index in the key of the map. Every physical input in the value - * for that task key will receive the failure notification. This method will - * be called once for every source task failure and information for all - * affected destinations must be provided in that invocation. - * - * @param sourceTaskIndex - * Source task - * @param destinationTaskAndInputIndices - * Map via which the routing information is returned - */ - public abstract void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex, - Map> destinationTaskAndInputIndices); - - /** - * Get the number of destination tasks that consume data from the source task - * @param sourceTaskIndex Source task index - */ - public abstract int getNumDestinationConsumerTasks(int sourceTaskIndex); - - /** - * Return the source task index to which to send the input error event - * - * @param event - * Input read error event. Has more information about the error - * @param destinationTaskIndex - * Destination task that reported the error - * @param destinationFailedInputIndex - * Index of the physical input on the destination task that reported - * the error - * @return Index of the source task that created the unavailable input - */ - public abstract int routeInputErrorEventToSource(InputReadErrorEvent event, - int destinationTaskIndex, int destinationFailedInputIndex); - - /** - * Return ahe {@link org.apache.tez.dag.api.EdgeManagerContext} for this specific instance of - * the vertex manager. - * - * @return the {@link org.apache.tez.dag.api.EdgeManagerContext} for the input - */ - public EdgeManagerContext getContext() { - return this.context; - } -} http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerContext.java deleted file mode 100644 index 4158647..0000000 --- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerContext.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.api; - -/** - * Context information provided to Edge plugins on initialization. - * - */ -public interface EdgeManagerContext { - - /** - * Returns the byte payload specified by the user for the edge. - * @return the byte payload specified by the user - */ - public byte[] getUserPayload(); - - /** - * Returns the source vertex name - * @return the source vertex name - */ - public String getSourceVertexName(); - - /** - * Returns the destination vertex name - * @return the destination vertex name - */ - public String getDestinationVertexName(); - - /** - * Returns the number of tasks in the source vertex - */ - public int getSourceVertexNumTasks(); - - /** - * Returns the number of tasks in the destination vertex - */ - public int getDestinationVertexNumTasks(); - -} http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerDescriptor.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerDescriptor.java deleted file mode 100644 index 058e8e8..0000000 --- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerDescriptor.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.dag.api; - -public class EdgeManagerDescriptor extends TezEntityDescriptor { - - public EdgeManagerDescriptor(String edgeManagerClassName) { - super(edgeManagerClassName); - } - -} http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPlugin.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPlugin.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPlugin.java new file mode 100644 index 0000000..07bf3fe --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPlugin.java @@ -0,0 +1,147 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.dag.api; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.tez.runtime.api.events.DataMovementEvent; +import org.apache.tez.runtime.api.events.InputReadErrorEvent; + +/** + * This interface defines the routing of the event between tasks of producer and + * consumer vertices. The routing is bi-directional. Users can customize the + * routing by providing an implementation of this interface. + */ +@InterfaceStability.Unstable +public abstract class EdgeManagerPlugin { + + private final EdgeManagerPluginContext context; + + /** + * Create an instance of the EdgeManagerPlugin. Classes extending this to + * create a EdgeManagerPlugin, must provide the same constructor so that Tez + * can create an instance of the class at runtime. + * + * @param context + * the context within which this EdgeManagerPlugin will run. Includes + * information like configuration which the user may have specified + * while setting up the edge. + */ + public EdgeManagerPlugin(EdgeManagerPluginContext context) { + this.context = context; + } + + /** + * Initializes the EdgeManagerPlugin. This method is called in the following + * circumstances

1. when initializing an EdgeManagerPlugin for the first time. + *

2. When an EdgeManagerPlugin is replaced at runtime. At this point, an + * EdgeManagerPlugin instance is created and setup by the user. The initialize + * method will be called with the original {@link EdgeManagerPluginContext} when the + * EdgeManagerPlugin is replaced. + * + */ + public abstract void initialize(); + + /** + * Get the number of physical inputs on the destination task + * @param destinationTaskIndex Index of destination task for which number of + * inputs is needed + * @return Number of physical inputs on the destination task + */ + public abstract int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex); + + /** + * Get the number of physical outputs on the source task + * @param sourceTaskIndex Index of the source task for which number of outputs + * is needed + * @return Number of physical outputs on the source task + */ + public abstract int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex); + + /** + * Return the routing information to inform consumers about the source task + * output that is now available. The return map has the routing information. + * The event will be routed to every destination task index in the key of the + * map. Every physical input in the value for that task key will receive the + * input. + * + * @param event + * Data movement event that contains the output information + * @param sourceTaskIndex + * Source task that produced the event + * @param sourceOutputIndex + * Index of the physical output on the source task that produced the + * event + * @param destinationTaskAndInputIndices + * Map via which the routing information is returned + */ + public abstract void routeDataMovementEventToDestination(DataMovementEvent event, + int sourceTaskIndex, int sourceOutputIndex, + Map> destinationTaskAndInputIndices); + + /** + * Return the routing information to inform consumers about the failure of a + * source task whose outputs have been potentially lost. The return map has + * the routing information. The failure notification event will be sent to + * every task index in the key of the map. Every physical input in the value + * for that task key will receive the failure notification. This method will + * be called once for every source task failure and information for all + * affected destinations must be provided in that invocation. + * + * @param sourceTaskIndex + * Source task + * @param destinationTaskAndInputIndices + * Map via which the routing information is returned + */ + public abstract void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex, + Map> destinationTaskAndInputIndices); + + /** + * Get the number of destination tasks that consume data from the source task + * @param sourceTaskIndex Source task index + */ + public abstract int getNumDestinationConsumerTasks(int sourceTaskIndex); + + /** + * Return the source task index to which to send the input error event + * + * @param event + * Input read error event. Has more information about the error + * @param destinationTaskIndex + * Destination task that reported the error + * @param destinationFailedInputIndex + * Index of the physical input on the destination task that reported + * the error + * @return Index of the source task that created the unavailable input + */ + public abstract int routeInputErrorEventToSource(InputReadErrorEvent event, + int destinationTaskIndex, int destinationFailedInputIndex); + + /** + * Return ahe {@link org.apache.tez.dag.api.EdgeManagerPluginContext} for this specific instance of + * the vertex manager. + * + * @return the {@link org.apache.tez.dag.api.EdgeManagerPluginContext} for the input + */ + public EdgeManagerPluginContext getContext() { + return this.context; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginContext.java new file mode 100644 index 0000000..d55ea6d --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginContext.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.api; + +/** + * Context information provided to {@link EdgeManagerPlugin}s + * This interface is not supposed to be implemented by users + * + */ +public interface EdgeManagerPluginContext { + + /** + * Returns the payload specified by the user for the edge. + * @return the payload specified by the user + */ + public byte[] getUserPayload(); + + /** + * Returns the source vertex name + * @return the source vertex name + */ + public String getSourceVertexName(); + + /** + * Returns the destination vertex name + * @return the destination vertex name + */ + public String getDestinationVertexName(); + + /** + * Returns the number of tasks in the source vertex + */ + public int getSourceVertexNumTasks(); + + /** + * Returns the number of tasks in the destination vertex + */ + public int getDestinationVertexNumTasks(); + +} http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginDescriptor.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginDescriptor.java new file mode 100644 index 0000000..083326f --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeManagerPluginDescriptor.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.api; + +public class EdgeManagerPluginDescriptor extends TezEntityDescriptor { + + public EdgeManagerPluginDescriptor(String edgeManagerPluginClassName) { + super(edgeManagerPluginClassName); + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java index 6f0c66d..6ed3c92 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java @@ -96,7 +96,7 @@ public class EdgeProperty { final SchedulingType schedulingType; final InputDescriptor inputDescriptor; final OutputDescriptor outputDescriptor; - final EdgeManagerDescriptor edgeManagerDescriptor; + final EdgeManagerPluginDescriptor edgeManagerDescriptor; /** * @param dataMovementType @@ -135,7 +135,7 @@ public class EdgeProperty { * @param edgeDestination * The {@link InputDescriptor} which will consume data from the edge. */ - public EdgeProperty(EdgeManagerDescriptor edgeManagerDescriptor, + public EdgeProperty(EdgeManagerPluginDescriptor edgeManagerDescriptor, DataSourceType dataSourceType, SchedulingType schedulingType, OutputDescriptor edgeSource, @@ -176,10 +176,10 @@ public class EdgeProperty { /** * Returns the Edge Manager specifications for this edge. - * @return @link {@link EdgeManagerDescriptor} if a custom edge was setup, null otherwise. + * @return @link {@link EdgeManagerPluginDescriptor} if a custom edge was setup, null otherwise. */ @Private - public EdgeManagerDescriptor getEdgeManagerDescriptor() { + public EdgeManagerPluginDescriptor getEdgeManagerDescriptor() { return edgeManagerDescriptor; } http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java index 6aec78e..40e2cc0 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java @@ -37,13 +37,13 @@ public abstract class VertexManagerPlugin { private final VertexManagerPluginContext context; /** - * Crete an instance of the VertexManagerPlugin. Classes extending this one to create a - * VertexManagerPlugin, must provide the same constructor so that Tez can create an instance of - * the class at runtime. - * - * @param context vertex manager plugin context which can be used to access the payload, - * vertex - * properties, etc + * Crete an instance of the VertexManagerPlugin. Classes extending this to + * create a VertexManagerPlugin, must provide the same constructor so that Tez + * can create an instance of the class at runtime. + * + * @param context + * vertex manager plugin context which can be used to access the + * payload, vertex properties, etc */ public VertexManagerPlugin(VertexManagerPluginContext context) { this.context = context; http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java index 7c48adc..8a29691 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java @@ -30,8 +30,8 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint; -import org.apache.tez.runtime.api.RootInputSpecUpdate; -import org.apache.tez.runtime.api.events.RootInputDataInformationEvent; +import org.apache.tez.runtime.api.InputSpecUpdate; +import org.apache.tez.runtime.api.events.InputDataInformationEvent; import com.google.common.base.Preconditions; @@ -127,8 +127,8 @@ public interface VertexManagerPluginContext { */ public boolean setVertexParallelism(int parallelism, @Nullable VertexLocationHint locationHint, - @Nullable Map sourceEdgeManagers, - @Nullable Map rootInputSpecUpdate); + @Nullable Map sourceEdgeManagers, + @Nullable Map rootInputSpecUpdate); /** * Allows a VertexManagerPlugin to assign Events for Root Inputs @@ -143,7 +143,7 @@ public interface VertexManagerPluginContext { * the Vertex. The target index on individual events represents the * task to which events need to be sent. */ - public void addRootInputEvents(String inputName, Collection events); + public void addRootInputEvents(String inputName, Collection events); /** * Notify the vertex to start the given tasks http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java index 173952e..0af498d 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java @@ -27,24 +27,24 @@ import org.apache.hadoop.classification.InterfaceAudience; @InterfaceAudience.Public public abstract class AbstractLogicalIOProcessor implements LogicalIOProcessor, LogicalIOProcessorFrameworkInterface { - private final TezProcessorContext context; + private final ProcessorContext context; /** * Constructor an instance of the LogicalProcessor. Classes extending this one to create a * LogicalProcessor, must provide the same constructor so that Tez can create an instance of the * class at runtime. * - * @param context the {@link org.apache.tez.runtime.api.TezProcessorContext} which provides + * @param context the {@link org.apache.tez.runtime.api.ProcessorContext} which provides * the Processor with context information within the running task. */ - public AbstractLogicalIOProcessor(TezProcessorContext context) { + public AbstractLogicalIOProcessor(ProcessorContext context) { this.context = context; } @Override public abstract void initialize() throws Exception; - public final TezProcessorContext getContext() { + public final ProcessorContext getContext() { return context; } http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java index 8b6edda..d1facc7 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java @@ -29,7 +29,7 @@ import java.util.List; * * Input classes must provide a 2 argument public constructor for Tez to create the * Input. The parameters to this constructor are 1) an instance of - * {@link org.apache.tez.runtime.api.TezInputContext} and 2) an integer which is used to + * {@link org.apache.tez.runtime.api.InputContext} and 2) an integer which is used to * setup the number of physical inputs that the logical input will see. * Tez will take care of initializing and closing the Input after a {@link Processor} completes.

*

@@ -38,19 +38,19 @@ import java.util.List; public abstract class AbstractLogicalInput implements LogicalInput, LogicalInputFrameworkInterface { private final int numPhysicalInputs; - private final TezInputContext inputContext; + private final InputContext inputContext; /** * Constructor an instance of the LogicalInput. Classes extending this one to create a * LogicalInput, must provide the same constructor so that Tez can create an instance of the * class at runtime. * - * @param inputContext the {@link org.apache.tez.runtime.api.TezInputContext} which provides + * @param inputContext the {@link org.apache.tez.runtime.api.InputContext} which provides * the Input with context information within the running task. * @param numPhysicalInputs the number of physical inputs that the logical input will * receive. This is typically determined by Edge Routing. */ - public AbstractLogicalInput(TezInputContext inputContext, int numPhysicalInputs) { + public AbstractLogicalInput(InputContext inputContext, int numPhysicalInputs) { this.inputContext = inputContext; this.numPhysicalInputs = numPhysicalInputs; } @@ -69,12 +69,12 @@ public abstract class AbstractLogicalInput implements LogicalInput, LogicalInput } /** - * Return ahe {@link org.apache.tez.runtime.api.TezInputContext} for this specific instance of + * Return ahe {@link org.apache.tez.runtime.api.InputContext} for this specific instance of * the LogicalInput * - * @return the {@link org.apache.tez.runtime.api.TezInputContext} for the input + * @return the {@link org.apache.tez.runtime.api.InputContext} for the input */ - public final TezInputContext getContext() { + public final InputContext getContext() { return inputContext; } } http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java index d88e57f..317390a 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalOutput.java @@ -31,20 +31,20 @@ import java.util.List; public abstract class AbstractLogicalOutput implements LogicalOutput, LogicalOutputFrameworkInterface { private final int numPhysicalOutputs; - private final TezOutputContext outputContext; + private final OutputContext outputContext; /** * Constructor an instance of the LogicalOutput. Classes extending this one to create a * LogicalOutput, must provide the same constructor so that Tez can create an instance of the * class at runtime. * - * @param outputContext the {@link org.apache.tez.runtime.api.TezOutputContext} which + * @param outputContext the {@link org.apache.tez.runtime.api.OutputContext} which * provides * the Output with context information within the running task. * @param numPhysicalOutputs the number of physical outputs that the logical output will * generate. This is typically determined by Edge Routing. */ - public AbstractLogicalOutput(TezOutputContext outputContext, int numPhysicalOutputs) { + public AbstractLogicalOutput(OutputContext outputContext, int numPhysicalOutputs) { this.outputContext = outputContext; this.numPhysicalOutputs = numPhysicalOutputs; } @@ -62,12 +62,12 @@ public abstract class AbstractLogicalOutput implements LogicalOutput, LogicalOut } /** - * Return ahe {@link org.apache.tez.runtime.api.TezOutputContext} for this specific instance of + * Return ahe {@link org.apache.tez.runtime.api.OutputContext} for this specific instance of * the LogicalOutput * - * @return the {@link org.apache.tez.runtime.api.TezOutputContext} for the output + * @return the {@link org.apache.tez.runtime.api.OutputContext} for the output */ - public final TezOutputContext getContext() { + public final OutputContext getContext() { return outputContext; } } http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/InputContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputContext.java new file mode 100644 index 0000000..1961184 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputContext.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.api; + +/** + * Context handle for the Input to initialize itself. + * This interface is not supposed to be implemented by users + */ +public interface InputContext extends TaskContext { + + /** + * Get the Vertex Name of the Source that generated data for this Input + * @return Name of the Source Vertex + */ + public String getSourceVertexName(); + + /** + * Get the index of the input in the set of all inputs for the task. The + * index will be consistent and valid only among the tasks of this vertex. + * @return index + */ + public int getInputIndex(); + + /** + * Inform the framework that the specific Input is ready for consumption. + * + * This method can be invoked multiple times. + */ + public void inputIsReady(); +} http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/InputFrameworkInterface.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputFrameworkInterface.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputFrameworkInterface.java index 5192252..25b37b8 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/InputFrameworkInterface.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputFrameworkInterface.java @@ -31,13 +31,13 @@ import java.util.List; *

* * During initialization, Inputs must specify an initial memory requirement via - * {@link TezInputContext}.requestInitialMemory + * {@link InputContext}.requestInitialMemory *

* * * Inputs must also inform the framework once they are ready to be consumed. * This typically means that the Processor will not block when reading from the - * corresponding Input. This is done via {@link TezInputContext}.inputIsReady. + * corresponding Input. This is done via {@link InputContext}.inputIsReady. * Inputs choose the policy on when they are ready. */ public interface InputFrameworkInterface { http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java new file mode 100644 index 0000000..a86dc22 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializer.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.api; + +import java.util.List; + +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.tez.runtime.api.events.InputInitializerEvent; + +/** + * InputInitializers are typically used to initialize vertices + * connected to data sources. They run in the App Master and can be used to + * distribute data across the tasks for the vertex, determine the number of + * tasks at runtime, update the Input payload etc. + */ +@InterfaceStability.Unstable +public abstract class InputInitializer { + + private final InputInitializerContext initializerContext; + + /** + * Constructor an instance of the InputInitializer. Classes extending this to create a + * InputInitializer, must provide the same constructor so that Tez can create an instance of + * the class at runtime. + * + * @param initializerContext initializer context which can be used to access the payload, vertex + * properties, etc + */ + public InputInitializer(InputInitializerContext initializerContext) { + this.initializerContext = initializerContext; + } + + /** + * Run the initializer. This is the main method where + * initialization takes place. If an Initializer is written to accept events, + * a notification mechanism should be setup, with the heavy lifting of + * processing the event being done via this method. The moment this method + * returns a list of events, input initialization is considered to be + * complete. + * + * @return a list of events which are eventually routed to a + * {@link org.apache.tez.dag.api.VertexManagerPlugin} for routing + * @throws Exception + */ + public abstract List initialize() + throws Exception; + + /** + * Handle events meant for the specific Initializer. This is a notification mechanism to inform + * the Initializer about events received. Extensive event processing should not be performed via + * this method call. Instead this should just be used as a notification method to the main + * initialization via the initialize method. + * + * @param events list of events + * @throws Exception + */ + public abstract void handleInputInitializerEvent(List events) + throws Exception; + + /** + * Return ahe {@link org.apache.tez.runtime.api.InputInitializerContext} + * for this specific instance of the Initializer. + * + * @return the {@link org.apache.tez.runtime.api.InputInitializerContext} + * for the initializer + */ + public final InputInitializerContext getContext() { + return this.initializerContext; + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java new file mode 100644 index 0000000..67b6732 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputInitializerContext.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.api; + +import javax.annotation.Nullable; + +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Resource; + +/** + * A context that provides information to the {@link InputInitializer} + */ +@Unstable +public interface InputInitializerContext { + + /** + * Get the YARN application id given to the Tez Application Master + * @return Application id + */ + ApplicationId getApplicationId(); + + /** + * Get the name of the DAG + * @return DAG name + */ + String getDAGName(); + + /** + * Get the name of the input + * @return Input name + */ + String getInputName(); + + /** + * Get the user payload for the input + * @return User payload + */ + @Nullable byte[] getInputUserPayload(); + + /** + * Get the user payload for the initializer + * @return User payload + */ + @Nullable byte[] getUserPayload(); + + /** + * Get the number of tasks in this vertex. Maybe -1 if the vertex has not been + * initialized with a pre-determined number of tasks. + * @return number of tasks + */ + int getNumTasks(); + + /** + * Get the resource allocated to a task of this vertex + * @return Resource + */ + Resource getVertexTaskResource(); + + /** + * Get the total resource allocated to this vertex. If the DAG is running in + * a busy cluster then it may have no resources available dedicated to it. The + * DAG may divide its resources among member vertices. + * @return Resource + */ + Resource getTotalAvailableResource(); + + /** + * Get the number of nodes in the cluster + * @return Number of nodes + */ + int getNumClusterNodes(); + + /** + * @return DAG Attempt number + */ + int getDAGAttemptNumber(); + + /** + * Get the number of tasks in the given vertex + * @param vertexName + * @return Total number of tasks in this vertex + */ + public int getVertexNumTasks(String vertexName); + +} http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/InputSpecUpdate.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/InputSpecUpdate.java b/tez-api/src/main/java/org/apache/tez/runtime/api/InputSpecUpdate.java new file mode 100644 index 0000000..0b86ae9 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/InputSpecUpdate.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.api; + +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.tez.runtime.api.events.InputDataInformationEvent; + +import com.google.common.collect.Lists; + +/** + * Update Input specs for Inputs running in a task. Allows setting the number of physical + * inputs for all work units if they have the same number of physical inputs, or individual + * numPhysicalInputs for each work unit. + * + */ +@Unstable +public class InputSpecUpdate { + + private final boolean forAllWorkUnits; + private final List numPhysicalInputs; + + private final static InputSpecUpdate DEFAULT_SINGLE_PHYSICAL_INPUT_SPEC = createAllTaskInputSpecUpdate(1); + + /** + * Create an update instance where all work units (typically represented by + * {@link InputDataInformationEvent}) will have the same number of physical inputs. + * + * @param numPhysicalInputs + * the number of physical inputs for all work units which will use the LogicalInput + * @return + */ + public static InputSpecUpdate createAllTaskInputSpecUpdate(int numPhysicalInputs) { + return new InputSpecUpdate(numPhysicalInputs); + } + + /** + * Create an update instance where all work units (typically represented by + * {@link InputDataInformationEvent}) will have the same number of physical inputs. + * + * @param perWorkUnitNumPhysicalInputs + * A list containing one entry per work unit. The order in the list corresponds to task + * index or equivalently the order of {@link InputDataInformationEvent}s being sent. + * @return + */ + public static InputSpecUpdate createPerTaskInputSpecUpdate( + List perWorkUnitNumPhysicalInputs) { + return new InputSpecUpdate(perWorkUnitNumPhysicalInputs); + } + + public static InputSpecUpdate getDefaultSinglePhysicalInputSpecUpdate() { + return DEFAULT_SINGLE_PHYSICAL_INPUT_SPEC; + } + + private InputSpecUpdate(int numPhysicalInputs) { + this.forAllWorkUnits = true; + this.numPhysicalInputs = Lists.newArrayList(numPhysicalInputs); + } + + private InputSpecUpdate(List perWorkUnitNumPhysicalInputs) { + this.forAllWorkUnits = false; + this.numPhysicalInputs = Lists.newArrayList(perWorkUnitNumPhysicalInputs); + } + + @Private + public int getNumPhysicalInputsForWorkUnit(int index) { + if (this.forAllWorkUnits) { + return numPhysicalInputs.get(0); + } else { + return numPhysicalInputs.get(index); + } + } + + @Private + /* Used for recovery serialization */ + public boolean isForAllWorkUnits() { + return this.forAllWorkUnits; + } + + @Private + /* Used for recovery serialization */ + public List getAllNumPhysicalInputs() { + return numPhysicalInputs; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/MergedInputContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedInputContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedInputContext.java new file mode 100644 index 0000000..0d8ea61 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedInputContext.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.api; + +import javax.annotation.Nullable; + +/** + * Context for {@link MergedLogicalInput} + * This interface is not supposed to be implemented by users + */ +public interface MergedInputContext { + + @Nullable + public byte[] getUserPayload(); + + /** + * Inform the framework that the specific Input is ready for consumption. + * + * This method can be invoked multiple times. + */ + public void inputIsReady(); + + /** + * Get the work directories for the Input + * @return an array of work dirs + */ + public String[] getWorkDirs(); +} http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java index 656bb2c..fd698bd 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java @@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean; *

* MergedLogicalInput implementations must provide a 2 argument public constructor for * Tez to create the Input. The parameters to this constructor are 1) an instance of {@link - * org.apache.tez.runtime.api.TezMergedInputContext} and 2) a list of constituent inputs. Tez will + * org.apache.tez.runtime.api.MergedInputContext} and 2) a list of constituent inputs. Tez will * take care of initializing and closing the Input after a {@link Processor} completes.

*

*/ @@ -40,7 +40,7 @@ public abstract class MergedLogicalInput implements LogicalInput { private AtomicBoolean notifiedInputReady = new AtomicBoolean(false); private List inputs; private final AtomicBoolean isStarted = new AtomicBoolean(false); - private final TezMergedInputContext context; + private final MergedInputContext context; /** * Constructor an instance of the MergedLogicalInputs. Classes extending this one to create a @@ -48,11 +48,11 @@ public abstract class MergedLogicalInput implements LogicalInput { * the * class at runtime. * - * @param context the {@link org.apache.tez.runtime.api.TezMergedInputContext} which provides + * @param context the {@link org.apache.tez.runtime.api.MergedInputContext} which provides * the Input with context information within the running task. * @param inputs the list of constituen Inputs. */ - public MergedLogicalInput(TezMergedInputContext context, List inputs) { + public MergedLogicalInput(MergedInputContext context, List inputs) { this.inputs = Collections.unmodifiableList(inputs); this.context = context; } @@ -61,7 +61,7 @@ public abstract class MergedLogicalInput implements LogicalInput { return inputs; } - public final TezMergedInputContext getContext() { + public final MergedInputContext getContext() { return context; } http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java index 7e21345..0b40695 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitter.java @@ -28,10 +28,10 @@ import org.apache.tez.dag.api.client.VertexStatus; @InterfaceStability.Unstable public abstract class OutputCommitter { - private final OutputCommitterContext outputCommitterContext; + private final OutputCommitterContext committerContext; /** - * Constructor an instance of the OutputCommitter. Classes extending this one to create an + * Constructor an instance of the OutputCommitter. Classes extending this to create a * OutputCommitter, must provide the same constructor so that Tez can create an instance of * the class at runtime. * @@ -39,7 +39,7 @@ public abstract class OutputCommitter { * properties, etc */ public OutputCommitter(OutputCommitterContext committerContext) { - this.outputCommitterContext = committerContext; + this.committerContext = committerContext; } /** @@ -108,7 +108,7 @@ public abstract class OutputCommitter { * @return the {@link org.apache.tez.runtime.api.OutputCommitterContext} for the input */ public final OutputCommitterContext getContext() { - return this.outputCommitterContext; + return this.committerContext; } } http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java index 7189c2e..05c2fd7 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputCommitterContext.java @@ -20,15 +20,15 @@ package org.apache.tez.runtime.api; import javax.annotation.Nullable; -import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationId; /** * Context through which the OutputCommitter can access all the relevant - * information that it needs. + * information that it needs. This interface is not supposed to be implemented + * by users */ - +@Unstable public interface OutputCommitterContext { /** @@ -77,8 +77,6 @@ public interface OutputCommitterContext { * Get Vertex Index in the DAG * @return Vertex index */ - @Unstable - @Evolving public int getVertexIndex(); } http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java new file mode 100644 index 0000000..d5796e2 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputContext.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.api; + +/** + * Context handle for the Output to initialize itself. + * This interface is not supposed to be implemented by users + */ +public interface OutputContext extends TaskContext { + + /** + * Get the Vertex Name of the Destination that is the recipient of this + * Output's data + * @return Name of the Destination Vertex + */ + public String getDestinationVertexName(); + + /** + * Get the index of the output in the set of all outputs for the task. The + * index will be consistent and valid only among the tasks of this vertex. + * @return index + */ + public int getOutputIndex(); + +} http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/OutputFrameworkInterface.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputFrameworkInterface.java b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputFrameworkInterface.java index 873fa0c..7f64f3e 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/OutputFrameworkInterface.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/OutputFrameworkInterface.java @@ -30,7 +30,7 @@ import java.util.List; *

* * During initialization, Outputs must specify an initial memory requirement via - * {@link TezOutputContext}.requestInitialMemory + * {@link OutputContext}.requestInitialMemory *

* */ http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java new file mode 100644 index 0000000..6a28e1d --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorContext.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.api; + +import java.io.IOException; +import java.util.Collection; + +/** + * Context handle for the Processor to initialize itself. + * This interface is not supposed to be implemented by users + */ +public interface ProcessorContext extends TaskContext { + + /** + * Set the overall progress of this Task Attempt + * @param progress Progress in the range from [0.0 - 1.0f] + */ + public void setProgress(float progress); + + /** + * Check whether this attempt can commit its output + * @return true if commit allowed + * @throws IOException + */ + public boolean canCommit() throws IOException; + + /** + * Blocking call which returns when any of the specified Inputs is ready for + * consumption. + * + * There can be multiple parallel invocations of this function - where each + * invocation blocks on the Inputs that it specifies. + * + * If multiple Inputs are ready, any one of them may be returned by this + * method - including an Input which may have been returned in a previous + * call. If invoking this method multiple times, it's recommended to remove + * previously completed Inputs from the invocation list. + * + * @param inputs + * the list of Inputs to monitor + * @return the Input which is ready for consumption + * @throws InterruptedException + */ + public Input waitForAnyInputReady(Collection inputs) throws InterruptedException; + + /** + * Blocking call which returns only after all of the specified Inputs are + * ready for consumption. + * + * There can be multiple parallel invocations of this function - where each + * invocation blocks on the Inputs that it specifies. + * + * @param inputs + * the list of Inputs to monitor + * @throws InterruptedException + */ + public void waitForAllInputsReady(Collection inputs) throws InterruptedException; +} http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/RootInputSpecUpdate.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/RootInputSpecUpdate.java b/tez-api/src/main/java/org/apache/tez/runtime/api/RootInputSpecUpdate.java deleted file mode 100644 index 72adf78..0000000 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/RootInputSpecUpdate.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.runtime.api; - -import java.util.List; - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.tez.runtime.api.events.RootInputDataInformationEvent; - -import com.google.common.collect.Lists; - -/** - * Update Input specs for Root Inputs running in a task. Allows setting the number of physical - * inputs for all work units if they have the same number of physical inputs, or individual - * numPhysicalInputs for each work unit. - * - */ -public class RootInputSpecUpdate { - - private final boolean forAllWorkUnits; - private final List numPhysicalInputs; - - private final static RootInputSpecUpdate DEFAULT_SINGLE_PHYSICAL_INPUT_SPEC = createAllTaskRootInputSpecUpdate(1); - - /** - * Create an update instance where all work units (typically represented by - * {@link RootInputDataInformationEvent}) will have the same number of physical inputs. - * - * @param numPhysicalInputs - * the number of physical inputs for all work units which will use the LogicalInput - * @return - */ - public static RootInputSpecUpdate createAllTaskRootInputSpecUpdate(int numPhysicalInputs) { - return new RootInputSpecUpdate(numPhysicalInputs); - } - - /** - * Create an update instance where all work units (typically represented by - * {@link RootInputDataInformationEvent}) will have the same number of physical inputs. - * - * @param perWorkUnitNumPhysicalInputs - * A list containing one entry per work unit. The order in the list corresponds to task - * index or equivalently the order of RootInputDataInformationEvents being sent. - * @return - */ - public static RootInputSpecUpdate createPerTaskRootInputSpecUpdate( - List perWorkUnitNumPhysicalInputs) { - return new RootInputSpecUpdate(perWorkUnitNumPhysicalInputs); - } - - public static RootInputSpecUpdate getDefaultSinglePhysicalInputSpecUpdate() { - return DEFAULT_SINGLE_PHYSICAL_INPUT_SPEC; - } - - private RootInputSpecUpdate(int numPhysicalInputs) { - this.forAllWorkUnits = true; - this.numPhysicalInputs = Lists.newArrayList(numPhysicalInputs); - } - - private RootInputSpecUpdate(List perWorkUnitNumPhysicalInputs) { - this.forAllWorkUnits = false; - this.numPhysicalInputs = Lists.newArrayList(perWorkUnitNumPhysicalInputs); - } - - @Private - public int getNumPhysicalInputsForWorkUnit(int index) { - if (this.forAllWorkUnits) { - return numPhysicalInputs.get(0); - } else { - return numPhysicalInputs.get(index); - } - } - - @Private - /* Used for recovery serialization */ - public boolean isForAllWorkUnits() { - return this.forAllWorkUnits; - } - - @Private - /* Used for recovery serialization */ - public List getAllNumPhysicalInputs() { - return numPhysicalInputs; - } -} http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java new file mode 100644 index 0000000..56c5cfd --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TaskContext.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.api; + +import java.nio.ByteBuffer; +import java.util.List; + +import javax.annotation.Nullable; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.runtime.common.objectregistry.ObjectRegistry; + +/** + * Base interface for Context classes used to initialize the Input, Output + * and Processor instances. + * This interface is not supposed to be implemented by users + */ +public interface TaskContext { + // Scale the maximum events we fetch per RPC call to mitigate OOM issues + // on the ApplicationMaster when a thundering herd of reducers fetch events + // This should not be necessary after HADOOP-8942. TEZ-1398 + + /** + * Get the {@link ApplicationId} for the running app + * @return the {@link ApplicationId} + */ + public ApplicationId getApplicationId(); + + /** + * Get the current DAG Attempt Number + * @return DAG Attempt Number + */ + public int getDAGAttemptNumber(); + + /** + * Get the index of this Task among the tasks of this vertex + * @return Task Index + */ + public int getTaskIndex(); + + /** + * Get the current Task Attempt Number + * @return Task Attempt Number + */ + public int getTaskAttemptNumber(); + + /** + * Get the name of the DAG + * @return the DAG name + */ + public String getDAGName(); + + /** + * Get the name of the Vertex in which the task is running + * @return Vertex Name + */ + public String getTaskVertexName(); + + /** + * Get the index of this task's vertex in the set of vertices in the DAG. This + * is consistent and valid across all tasks/vertices in the same DAG. + * @return index + */ + public int getTaskVertexIndex(); + + public TezCounters getCounters(); + + /** + * Send Events to the AM and/or dependent Vertices + * @param events Events to be sent + */ + public void sendEvents(List events); + + /** + * Get the User Payload for the Input/Output/Processor + * @return User Payload + */ + @Nullable + public byte[] getUserPayload(); + + /** + * Get the work directories for the Input/Output/Processor + * @return an array of work dirs + */ + public String[] getWorkDirs(); + + /** + * Returns an identifier which is unique to the specific Input, Processor or + * Output + * + * @return a unique identifier + */ + public String getUniqueIdentifier(); + + /** + * Returns a shared {@link ObjectRegistry} to hold user objects in memory + * between tasks. + * @return {@link ObjectRegistry} + */ + public ObjectRegistry getObjectRegistry(); + + /** + * Report a fatal error to the framework. This will cause the entire task to + * fail and should not be used for reporting temporary or recoverable errors + * + * @param exception an exception representing the error + */ + public void fatalError(@Nullable Throwable exception, @Nullable String message); + + /** + * Returns meta-data for the specified service. As an example, when the MR + * ShuffleHandler is used - this would return the jobToken serialized as bytes + * + * @param serviceName + * the name of the service for which meta-data is required + * @return a ByteBuffer representing the meta-data + */ + public ByteBuffer getServiceConsumerMetaData(String serviceName); + + /** + * Return Provider meta-data for the specified service As an example, when the + * MR ShuffleHandler is used - this would return the shuffle port serialized + * as bytes + * + * @param serviceName + * the name of the service for which provider meta-data is required + * @return a ByteBuffer representing the meta-data + */ + @Nullable + public ByteBuffer getServiceProviderMetaData(String serviceName); + + /** + * Request a specific amount of memory during initialization + * (initialize(..*Context)) The requester is notified of allocation via the + * provided callback handler. + * + * Currently, (post TEZ-668) the caller will be informed about the available + * memory after initialization (I/P/O initialize(...)), and before the + * start/run invocation. There will be no other invocations on the callback. + * + * This method can be called only once by any component. Calling it multiple + * times from within the same component will result in an error. + * + * Each Input / Output must request memory. For Inputs / Outputs which do not + * have a specific ask, a null callback handler can be specified with a + * request size of 0. + * + * @param size + * request size in bytes. + * @param callbackHandler + * the callback handler to be invoked once memory is assigned + */ + public void requestInitialMemory(long size, MemoryUpdateCallback callbackHandler); + + /** + * Gets the total memory available to all components of the running task. This + * values will always be constant, and does not factor in any allocations. + * + * @return the total available memory for all components of the task + */ + public long getTotalMemoryAvailableToTask(); + +} http://git-wip-us.apache.org/repos/asf/tez/blob/27e5b6c6/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java deleted file mode 100644 index 1f40ed9..0000000 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/TezInputContext.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.runtime.api; - -/** - * Context handle for the Input to initialize itself. - */ -public interface TezInputContext extends TezTaskContext { - - /** - * Get the Vertex Name of the Source that generated data for this Input - * @return Name of the Source Vertex - */ - public String getSourceVertexName(); - - /** - * Get the index of the input in the set of all inputs for the task. The - * index will be consistent and valid only among the tasks of this vertex. - * @return index - */ - public int getInputIndex(); - - /** - * Inform the framework that the specific Input is ready for consumption. - * - * This method can be invoked multiple times. - */ - public void inputIsReady(); -}