Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DB5DB114B3 for ; Sun, 21 Sep 2014 02:13:13 +0000 (UTC) Received: (qmail 86929 invoked by uid 500); 21 Sep 2014 02:13:13 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 86872 invoked by uid 500); 21 Sep 2014 02:13:13 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 86779 invoked by uid 99); 21 Sep 2014 02:13:13 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 21 Sep 2014 02:13:13 +0000 X-ASF-Spam-Status: No, hits=-2000.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Sun, 21 Sep 2014 02:12:46 +0000 Received: (qmail 83195 invoked by uid 99); 21 Sep 2014 02:12:25 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 21 Sep 2014 02:12:25 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id C35F4A1DE11; Sun, 21 Sep 2014 02:12:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.incubator.apache.org Date: Sun, 21 Sep 2014 02:12:46 -0000 Message-Id: <9e4a18a09f184ed398afa76b65cdef08@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [22/63] [abbrv] git commit: Unify all job vertices to one type (rather than dedicated input/output types) X-Virus-Checked: Checked by ClamAV on apache.org Unify all job vertices to one type (rather than dedicated input/output types) Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/cb7039e3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/cb7039e3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/cb7039e3 Branch: refs/heads/master Commit: cb7039e3e171474a7635a73dda9c086c84966dd0 Parents: e6aadfc Author: Stephan Ewen Authored: Sun Jun 22 19:05:02 2014 +0200 Committer: Stephan Ewen Committed: Sat Sep 20 20:02:48 2014 +0200 ---------------------------------------------------------------------- .../plantranslate/NepheleJobGraphGenerator.java | 25 ++- .../jobgraph/AbstractJobOutputVertex.java | 2 - .../runtime/jobgraph/AbstractJobVertex.java | 38 +---- .../jobgraph/InputFormatInputVertex.java | 103 ++++++++++++ .../apache/flink/runtime/jobgraph/JobEdge.java | 2 - .../apache/flink/runtime/jobgraph/JobGraph.java | 162 +++---------------- .../flink/runtime/jobgraph/JobInputVertex.java | 103 ------------ .../flink/runtime/jobgraph/JobOutputVertex.java | 84 ---------- .../jobgraph/OutputFormatOutputVertex.java | 83 ++++++++++ .../runtime/jobgraph/SimpleInputVertex.java | 61 +++++++ .../runtime/jobgraph/SimpleOutputVertex.java | 53 ++++++ .../jobgraph/tasks/AbstractInvokable.java | 12 +- .../executiongraph/SelfCrossForwardTask.java | 2 - .../BroadcastVarsNepheleITCase.java | 22 +-- .../KMeansIterativeNepheleITCase.java | 31 ++-- .../ConnectedComponentsNepheleITCase.java | 67 ++++---- .../IterationWithChainingNepheleITCase.java | 13 +- .../test/iterative/nephele/JobGraphUtils.java | 23 +-- .../CustomCompensatableDanglingPageRank.java | 16 +- ...mpensatableDanglingPageRankWithCombiner.java | 17 +- .../CompensatableDanglingPageRank.java | 15 +- .../test/runtime/NetworkStackThroughput.java | 8 +- 22 files changed, 449 insertions(+), 493 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java index 043a0a7..cb912de 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java @@ -66,11 +66,12 @@ import org.apache.flink.runtime.iterative.task.IterationTailPactTask; import org.apache.flink.runtime.jobgraph.AbstractJobOutputVertex; import org.apache.flink.runtime.jobgraph.AbstractJobVertex; import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.InputFormatInputVertex; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException; -import org.apache.flink.runtime.jobgraph.JobInputVertex; -import org.apache.flink.runtime.jobgraph.JobOutputVertex; import org.apache.flink.runtime.jobgraph.JobTaskVertex; +import org.apache.flink.runtime.jobgraph.OutputFormatOutputVertex; +import org.apache.flink.runtime.jobgraph.SimpleOutputVertex; import org.apache.flink.runtime.operators.CoGroupDriver; import org.apache.flink.runtime.operators.CoGroupWithSolutionSetFirstDriver; import org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver; @@ -808,8 +809,8 @@ public class NepheleJobGraphGenerator implements Visitor { return vertex; } - private JobInputVertex createDataSourceVertex(SourcePlanNode node) throws CompilerException { - final JobInputVertex vertex = new JobInputVertex(node.getNodeName(), this.jobGraph); + private AbstractJobVertex createDataSourceVertex(SourcePlanNode node) throws CompilerException { + final InputFormatInputVertex vertex = new InputFormatInputVertex(node.getNodeName(), this.jobGraph); final TaskConfig config = new TaskConfig(vertex.getConfiguration()); vertex.setInvokableClass(DataSourceTask.class); @@ -823,7 +824,7 @@ public class NepheleJobGraphGenerator implements Visitor { } private AbstractJobOutputVertex createDataSinkVertex(SinkPlanNode node) throws CompilerException { - final JobOutputVertex vertex = new JobOutputVertex(node.getNodeName(), this.jobGraph); + final OutputFormatOutputVertex vertex = new OutputFormatOutputVertex(node.getNodeName(), this.jobGraph); final TaskConfig config = new TaskConfig(vertex.getConfiguration()); vertex.setInvokableClass(DataSinkTask.class); @@ -1138,8 +1139,7 @@ public class NepheleJobGraphGenerator implements Visitor { headConfig.setRelativeBackChannelMemory(relativeMemForBackChannel); // --------------------------- create the sync task --------------------------- - final JobOutputVertex sync = new JobOutputVertex("Sync(" + - bulkNode.getNodeName() + ")", this.jobGraph); + final SimpleOutputVertex sync = new SimpleOutputVertex("Sync(" + bulkNode.getNodeName() + ")", this.jobGraph); sync.setInvokableClass(IterationSynchronizationSinkTask.class); sync.setNumberOfSubtasks(1); this.auxVertices.add(sync); @@ -1194,7 +1194,7 @@ public class NepheleJobGraphGenerator implements Visitor { tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD); // create the fake output task - JobOutputVertex fakeTail = new JobOutputVertex("Fake Tail", this.jobGraph); + SimpleOutputVertex fakeTail = new SimpleOutputVertex("Fake Tail", this.jobGraph); fakeTail.setInvokableClass(FakeOutputTask.class); fakeTail.setNumberOfSubtasks(headVertex.getNumberOfSubtasks()); this.auxVertices.add(fakeTail); @@ -1236,7 +1236,7 @@ public class NepheleJobGraphGenerator implements Visitor { tailConfigOfTerminationCriterion.setOutputSerializer(bulkNode.getSerializerForIterationChannel()); tailConfigOfTerminationCriterion.addOutputShipStrategy(ShipStrategyType.FORWARD); - JobOutputVertex fakeTailTerminationCriterion = new JobOutputVertex("Fake Tail for Termination Criterion", this.jobGraph); + SimpleOutputVertex fakeTailTerminationCriterion = new SimpleOutputVertex("Fake Tail for Termination Criterion", this.jobGraph); fakeTailTerminationCriterion.setInvokableClass(FakeOutputTask.class); fakeTailTerminationCriterion.setNumberOfSubtasks(headVertex.getNumberOfSubtasks()); this.auxVertices.add(fakeTailTerminationCriterion); @@ -1303,8 +1303,7 @@ public class NepheleJobGraphGenerator implements Visitor { // --------------------------- create the sync task --------------------------- final TaskConfig syncConfig; { - final JobOutputVertex sync = new JobOutputVertex("Sync (" + - iterNode.getNodeName() + ")", this.jobGraph); + final SimpleOutputVertex sync = new SimpleOutputVertex("Sync (" + iterNode.getNodeName() + ")", this.jobGraph); sync.setInvokableClass(IterationSynchronizationSinkTask.class); sync.setNumberOfSubtasks(1); this.auxVertices.add(sync); @@ -1369,7 +1368,7 @@ public class NepheleJobGraphGenerator implements Visitor { worksetTailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD); // create the fake output task - JobOutputVertex fakeTail = new JobOutputVertex("Fake Tail", this.jobGraph); + SimpleOutputVertex fakeTail = new SimpleOutputVertex("Fake Tail", this.jobGraph); fakeTail.setInvokableClass(FakeOutputTask.class); fakeTail.setNumberOfSubtasks(headVertex.getNumberOfSubtasks()); this.auxVertices.add(fakeTail); @@ -1407,7 +1406,7 @@ public class NepheleJobGraphGenerator implements Visitor { solutionDeltaConfig.addOutputShipStrategy(ShipStrategyType.FORWARD); // create the fake output task - JobOutputVertex fakeTail = new JobOutputVertex("Fake Tail", this.jobGraph); + SimpleOutputVertex fakeTail = new SimpleOutputVertex("Fake Tail", this.jobGraph); fakeTail.setInvokableClass(FakeOutputTask.class); fakeTail.setNumberOfSubtasks(headVertex.getNumberOfSubtasks()); this.auxVertices.add(fakeTail); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobOutputVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobOutputVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobOutputVertex.java index c1f0ec5..edb8d0e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobOutputVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobOutputVertex.java @@ -16,12 +16,10 @@ * limitations under the License. */ - package org.apache.flink.runtime.jobgraph; /** * An abstract base class for output vertices in Nephele. - * */ public abstract class AbstractJobOutputVertex extends AbstractJobVertex { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java index 08a9567..7df76c3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java @@ -72,14 +72,9 @@ public abstract class AbstractJobVertex implements IOReadableWritable { private int numberOfSubtasks = -1; /** - * Number of retries in case of an error before the task represented by this vertex is considered as failed. - */ - private int numberOfExecutionRetries = -1; - - /** * Other task to share a (set of) of instances with at runtime. */ - private AbstractJobVertex vertexToShareInstancesWith = null; + private AbstractJobVertex vertexToShareInstancesWith; /** * Custom configuration passed to the assigned task at runtime. @@ -89,7 +84,7 @@ public abstract class AbstractJobVertex implements IOReadableWritable { /** * The class of the invokable. */ - protected Class invokableClass = null; + protected Class invokableClass; /** @@ -388,9 +383,6 @@ public abstract class AbstractJobVertex implements IOReadableWritable { // Read number of subtasks this.numberOfSubtasks = in.readInt(); - // Number of execution retries - this.numberOfExecutionRetries = in.readInt(); - // Read vertex to share instances with if (in.readBoolean()) { final JobVertexID id = new JobVertexID(); @@ -464,9 +456,6 @@ public abstract class AbstractJobVertex implements IOReadableWritable { // Number of subtasks out.writeInt(this.numberOfSubtasks); - // Number of execution retries - out.writeInt(this.numberOfExecutionRetries); - // Vertex to share instance with if (this.vertexToShareInstancesWith != null) { out.writeBoolean(true); @@ -538,29 +527,6 @@ public abstract class AbstractJobVertex implements IOReadableWritable { } /** - * Sets the number of retries in case of an error before the task represented by this vertex is considered as - * failed. - * - * @param numberOfExecutionRetries - * the number of retries in case of an error before the task represented by this vertex is considered as - * failed - */ - public void setNumberOfExecutionRetries(final int numberOfExecutionRetries) { - this.numberOfExecutionRetries = numberOfExecutionRetries; - } - - /** - * Returns the number of retries in case of an error before the task represented by this vertex is considered as - * failed. - * - * @return the number of retries in case of an error before the task represented by this vertex is considered as - * failed or -1 if unspecified - */ - public int getNumberOfExecutionRetries() { - return this.numberOfExecutionRetries; - } - - /** * Sets the vertex this vertex should share its instances with at runtime. * * @param vertex http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatInputVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatInputVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatInputVertex.java new file mode 100644 index 0000000..f79264a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatInputVertex.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph; + +import java.io.IOException; + +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.operators.util.UserCodeWrapper; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.runtime.operators.util.TaskConfig; + + +public class InputFormatInputVertex extends AbstractJobInputVertex { + + private InputFormat inputFormat; + + public InputFormatInputVertex(String name, JobVertexID id, JobGraph jobGraph) { + super(name, id, jobGraph); + } + + /** + * Creates a new job file input vertex with the specified name. + * + * @param name + * The name of the new job file input vertex. + * @param jobGraph + * The job graph this vertex belongs to. + */ + public InputFormatInputVertex(String name, JobGraph jobGraph) { + this(name, null, jobGraph); + } + + /** + * Creates a new job file input vertex. + * + * @param jobGraph + * The job graph this vertex belongs to. + */ + public InputFormatInputVertex(JobGraph jobGraph) { + this(null, jobGraph); + } + + public void setInputFormat(InputFormat format) { + this.inputFormat = format; + } + + public void initializeInputFormatFromTaskConfig(ClassLoader cl) { + TaskConfig cfg = new TaskConfig(getConfiguration()); + + UserCodeWrapper> wrapper = cfg.>getStubWrapper(cl); + + if (wrapper != null) { + this.inputFormat = wrapper.getUserCodeObject(InputFormat.class, cl); + this.inputFormat.configure(cfg.getStubParameters()); + } + } + + /** + * Gets the input split type class + * + * @return Input split type class + */ + @Override + public Class getInputSplitType() { + if (inputFormat == null){ + return InputSplit.class; + } + + return inputFormat.getInputSplitType(); + } + + /** + * Gets the input splits from the input format. + * + * @param minNumSplits Number of minimal input splits + * @return Array of input splits + * @throws IOException + */ + @Override + public InputSplit[] getInputSplits(int minNumSplits) throws IOException { + if (inputFormat == null){ + return null; + } + + return inputFormat.createInputSplits(minNumSplits); + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java index 0a5df3a..33b6576 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.runtime.jobgraph; import org.apache.flink.runtime.io.network.channels.ChannelType; @@ -24,7 +23,6 @@ import org.apache.flink.runtime.io.network.channels.ChannelType; /** * Objects of this class represent edges in the user's job graph. * The edges can be annotated by a specific channel and compression level. - * */ public class JobEdge { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java index 2040c8e..48d858a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java @@ -27,6 +27,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -46,24 +47,13 @@ import org.apache.flink.util.ClassUtils; /** * A job graph represents an entire job in Nephele. A job graph must consists at least of one job vertex * and must be acyclic. - * */ public class JobGraph implements IOReadableWritable { /** - * List of input vertices included in this job graph. - */ - private Map inputVertices = new HashMap(); - - /** - * List of output vertices included in this job graph. - */ - private Map outputVertices = new HashMap(); - - /** * List of task vertices included in this job graph. */ - private Map taskVertices = new HashMap(); + private Map taskVertices = new LinkedHashMap(); /** * ID of this job. @@ -90,11 +80,8 @@ public class JobGraph implements IOReadableWritable { */ private static final int BUFFERSIZE = 8192; - /** - * Buffer for array of reachable job vertices - */ - private volatile AbstractJobVertex[] bufferedAllReachableJobVertices = null; - + // -------------------------------------------------------------------------------------------- + /** * Constructs a new job graph with a random job ID. */ @@ -108,7 +95,7 @@ public class JobGraph implements IOReadableWritable { * @param jobName * the name for this job graph */ - public JobGraph(final String jobName) { + public JobGraph(String jobName) { this(); this.jobName = jobName; } @@ -128,65 +115,27 @@ public class JobGraph implements IOReadableWritable { * @return the configuration object for this job, or null if it is not set */ public Configuration getJobConfiguration() { - return this.jobConfiguration; } /** - * Adds a new input vertex to the job graph if it is not already included. - * - * @param inputVertex - * the new input vertex to be added - */ - public void addVertex(AbstractJobInputVertex inputVertex) { - if (!inputVertices.containsKey(inputVertex.getID())) { - inputVertices.put(inputVertex.getID(), inputVertex); - } - } - - /** * Adds a new task vertex to the job graph if it is not already included. * * @param taskVertex * the new task vertex to be added */ - public void addVertex(JobTaskVertex taskVertex) { - if (!taskVertices.containsKey(taskVertex.getID())) { - taskVertices.put(taskVertex.getID(), taskVertex); - } - } - - /** - * Adds a new output vertex to the job graph if it is not already included. - * - * @param outputVertex - * the new output vertex to be added - */ - public void addVertex(AbstractJobOutputVertex outputVertex) { - if (!outputVertices.containsKey(outputVertex.getID())) { - outputVertices.put(outputVertex.getID(), outputVertex); + public void addVertex(AbstractJobVertex vertex) { + final JobVertexID id = vertex.getID(); + AbstractJobVertex previous = taskVertices.put(id, vertex); + + // if we had a prior association, restore and throw an exception + if (previous != null) { + taskVertices.put(id, vertex); + throw new IllegalArgumentException("The JobGraph already contains a vertex with that id."); } } /** - * Returns the number of input vertices registered with the job graph. - * - * @return the number of input vertices registered with the job graph - */ - public int getNumberOfInputVertices() { - return this.inputVertices.size(); - } - - /** - * Returns the number of output vertices registered with the job graph. - * - * @return the number of output vertices registered with the job graph - */ - public int getNumberOfOutputVertices() { - return this.outputVertices.size(); - } - - /** * Returns the number of task vertices registered with the job graph. * * @return the number of task vertices registered with the job graph @@ -196,39 +145,12 @@ public class JobGraph implements IOReadableWritable { } /** - * Returns an iterator to iterate all input vertices registered with the job graph. - * - * @return an iterator to iterate all input vertices registered with the job graph - */ - public Iterator getInputVertices() { - - final Collection coll = this.inputVertices.values(); - - return coll.iterator(); - } - - /** - * Returns an iterator to iterate all output vertices registered with the job graph. - * - * @return an iterator to iterate all output vertices registered with the job graph - */ - public Iterator getOutputVertices() { - - final Collection coll = this.outputVertices.values(); - - return coll.iterator(); - } - - /** - * Returns an iterator to iterate all task vertices registered with the job graph. + * Returns an Iterable to iterate all vertices registered with the job graph. * - * @return an iterator to iterate all task vertices registered with the job graph + * @return an Iterable to iterate all vertices registered with the job graph */ - public Iterator getTaskVertices() { - - final Collection coll = this.taskVertices.values(); - - return coll.iterator(); + public Iterable getTaskVertices() { + return this.taskVertices.values(); } /** @@ -237,35 +159,7 @@ public class JobGraph implements IOReadableWritable { * @return the number of all job vertices registered with this job graph */ public int getNumberOfVertices() { - - return this.inputVertices.size() + this.outputVertices.size() + this.taskVertices.size(); - } - - /** - * Returns an array of all job vertices than can be reached when traversing the job graph from the input vertices. - * Each job vertex is contained only one time. - * - * @return an array of all job vertices than can be reached when traversing the job graph from the input vertices - */ - public AbstractJobVertex[] getAllReachableJobVertices() { - if(bufferedAllReachableJobVertices == null){ - final List collector = new ArrayList(); - final HashSet visited = new HashSet(); - - final Iterator inputs = getInputVertices(); - - while(inputs.hasNext()){ - AbstractJobVertex vertex = inputs.next(); - - if(!visited.contains(vertex.getID())){ - collectVertices(vertex, visited, collector); - } - } - - bufferedAllReachableJobVertices = collector.toArray(new AbstractJobVertex[0]); - } - - return bufferedAllReachableJobVertices; + return this.taskVertices.size(); } /** @@ -297,7 +191,8 @@ public class JobGraph implements IOReadableWritable { * @return an array of all job vertices that are registered with the job graph */ public AbstractJobVertex[] getAllJobVertices() { - + return this.taskVertices.values().toArray(new AbstractJobVertex[this.taskVertices.size()]); + int i = 0; final AbstractJobVertex[] vertices = new AbstractJobVertex[inputVertices.size() + outputVertices.size() + taskVertices.size()]; @@ -337,21 +232,8 @@ public class JobGraph implements IOReadableWritable { * the ID of the vertex to search for * @return the vertex with the matching ID or null if no vertex with such ID could be found */ - public AbstractJobVertex findVertexByID(final JobVertexID id) { - - if (this.inputVertices.containsKey(id)) { - return this.inputVertices.get(id); - } - - if (this.outputVertices.containsKey(id)) { - return this.outputVertices.get(id); - } - - if (this.taskVertices.containsKey(id)) { - return this.taskVertices.get(id); - } - - return null; + public AbstractJobVertex findVertexByID(JobVertexID id) { + return this.taskVertices.get(id); } /** http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobInputVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobInputVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobInputVertex.java deleted file mode 100644 index bffb182..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobInputVertex.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.jobgraph; - -import java.io.IOException; - -import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.api.common.operators.util.UserCodeWrapper; -import org.apache.flink.core.io.InputSplit; -import org.apache.flink.runtime.operators.util.TaskConfig; - -public class JobInputVertex extends AbstractJobInputVertex { - - private InputFormat inputFormat; - - public JobInputVertex(String name, JobVertexID id, JobGraph jobGraph) { - super(name, id, jobGraph); - } - - /** - * Creates a new job file input vertex with the specified name. - * - * @param name - * The name of the new job file input vertex. - * @param jobGraph - * The job graph this vertex belongs to. - */ - public JobInputVertex(String name, JobGraph jobGraph) { - this(name, null, jobGraph); - } - - /** - * Creates a new job file input vertex. - * - * @param jobGraph - * The job graph this vertex belongs to. - */ - public JobInputVertex(JobGraph jobGraph) { - this(null, jobGraph); - } - - public void setInputFormat(InputFormat format) { - this.inputFormat = format; - } - - public void initializeInputFormatFromTaskConfig(ClassLoader cl) { - TaskConfig cfg = new TaskConfig(getConfiguration()); - - UserCodeWrapper> wrapper = cfg.>getStubWrapper(cl); - - if (wrapper != null) { - this.inputFormat = wrapper.getUserCodeObject(InputFormat.class, cl); - this.inputFormat.configure(cfg.getStubParameters()); - } - } - - /** - * Gets the input split type class - * - * @return Input split type class - */ - @Override - public Class getInputSplitType() { - if (inputFormat == null){ - return InputSplit.class; - } - - return inputFormat.getInputSplitType(); - } - - /** - * Gets the input splits from the input format. - * - * @param minNumSplits Number of minimal input splits - * @return Array of input splits - * @throws IOException - */ - @Override - public InputSplit[] getInputSplits(int minNumSplits) throws IOException { - if (inputFormat == null){ - return null; - } - - return inputFormat.createInputSplits(minNumSplits); - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobOutputVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobOutputVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobOutputVertex.java deleted file mode 100644 index 352d9b3..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobOutputVertex.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.jobgraph; - -import org.apache.flink.api.common.io.OutputFormat; -import org.apache.flink.api.common.operators.util.UserCodeWrapper; -import org.apache.flink.runtime.operators.util.TaskConfig; - -/** - * A JobOutputVertex is a specific sub-type of a {@link AbstractJobOutputVertex} and is designed - * for Nephele tasks which sink data in a not further specified way. As every job output vertex, - * a JobOutputVertex must not have any further output. - */ -public class JobOutputVertex extends AbstractJobOutputVertex { - /** - * Contains the output format associated to this output vertex. It can be
null
. - */ - private OutputFormat outputFormat; - - - /** - * Creates a new job file output vertex with the specified name. - * - * @param name - * the name of the new job file output vertex - * @param jobGraph - * the job graph this vertex belongs to - */ - public JobOutputVertex(String name, JobGraph jobGraph) { - this(name, null, jobGraph); - } - - public JobOutputVertex(String name, JobVertexID id, JobGraph jobGraph) { - super(name, id, jobGraph); - } - - /** - * Creates a new job file input vertex. - * - * @param jobGraph - * the job graph this vertex belongs to - */ - public JobOutputVertex(JobGraph jobGraph) { - this(null, jobGraph); - } - - public void setOutputFormat(OutputFormat format) { - this.outputFormat = format; - } - - public void initializeOutputFormatFromTaskConfig(ClassLoader cl) { - TaskConfig cfg = new TaskConfig(getConfiguration()); - UserCodeWrapper> wrapper = cfg.>getStubWrapper(cl); - - if (wrapper != null) { - this.outputFormat = wrapper.getUserCodeObject(OutputFormat.class, cl); - this.outputFormat.configure(cfg.getStubParameters()); - } - } - - /** - * Returns the output format. It can also be
null
. - * - * @return output format or
null
- */ - public OutputFormat getOutputFormat() { return outputFormat; } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatOutputVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatOutputVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatOutputVertex.java new file mode 100644 index 0000000..08a03bc --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatOutputVertex.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph; + +import org.apache.flink.api.common.io.OutputFormat; +import org.apache.flink.api.common.operators.util.UserCodeWrapper; +import org.apache.flink.runtime.operators.util.TaskConfig; + +/** + * A JobOutputVertex is a specific sub-type of a {@link AbstractJobOutputVertex} and is designed + * for Nephele tasks which sink data in a not further specified way. As every job output vertex, + * a JobOutputVertex must not have any further output. + */ +public class OutputFormatOutputVertex extends AbstractJobOutputVertex { + /** + * Contains the output format associated to this output vertex. It can be
null
. + */ + private OutputFormat outputFormat; + + + /** + * Creates a new job file output vertex with the specified name. + * + * @param name + * the name of the new job file output vertex + * @param jobGraph + * the job graph this vertex belongs to + */ + public OutputFormatOutputVertex(String name, JobGraph jobGraph) { + this(name, null, jobGraph); + } + + public OutputFormatOutputVertex(String name, JobVertexID id, JobGraph jobGraph) { + super(name, id, jobGraph); + } + + /** + * Creates a new job file input vertex. + * + * @param jobGraph + * the job graph this vertex belongs to + */ + public OutputFormatOutputVertex(JobGraph jobGraph) { + this(null, jobGraph); + } + + public void setOutputFormat(OutputFormat format) { + this.outputFormat = format; + } + + public void initializeOutputFormatFromTaskConfig(ClassLoader cl) { + TaskConfig cfg = new TaskConfig(getConfiguration()); + UserCodeWrapper> wrapper = cfg.>getStubWrapper(cl); + + if (wrapper != null) { + this.outputFormat = wrapper.getUserCodeObject(OutputFormat.class, cl); + this.outputFormat.configure(cfg.getStubParameters()); + } + } + + /** + * Returns the output format. It can also be
null
. + * + * @return output format or
null
+ */ + public OutputFormat getOutputFormat() { return outputFormat; } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleInputVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleInputVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleInputVertex.java new file mode 100644 index 0000000..3699f0e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleInputVertex.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph; + +import org.apache.flink.core.io.InputSplit; + + +public class SimpleInputVertex extends AbstractJobInputVertex { + + /** + * Creates a new job file output vertex with the specified name. + * + * @param name + * the name of the new job file output vertex + * @param jobGraph + * the job graph this vertex belongs to + */ + public SimpleInputVertex(String name, JobGraph jobGraph) { + this(name, null, jobGraph); + } + + public SimpleInputVertex(String name, JobVertexID id, JobGraph jobGraph) { + super(name, id, jobGraph); + } + + /** + * Creates a new job file input vertex. + * + * @param jobGraph + * the job graph this vertex belongs to + */ + public SimpleInputVertex(JobGraph jobGraph) { + this(null, jobGraph); + } + + @Override + public Class getInputSplitType() { + return null; + } + + @Override + public InputSplit[] getInputSplits(int minNumSplits) throws Exception { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleOutputVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleOutputVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleOutputVertex.java new file mode 100644 index 0000000..8709a07 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleOutputVertex.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph; + +/** + * A JobOutputVertex is a specific sub-type of a {@link AbstractJobOutputVertex} and is designed + * for Nephele tasks which sink data in a not further specified way. As every job output vertex, + * a JobOutputVertex must not have any further output. + */ +public class SimpleOutputVertex extends AbstractJobOutputVertex { + + /** + * Creates a new job file output vertex with the specified name. + * + * @param name + * the name of the new job file output vertex + * @param jobGraph + * the job graph this vertex belongs to + */ + public SimpleOutputVertex(String name, JobGraph jobGraph) { + this(name, null, jobGraph); + } + + public SimpleOutputVertex(String name, JobVertexID id, JobGraph jobGraph) { + super(name, id, jobGraph); + } + + /** + * Creates a new job file input vertex. + * + * @param jobGraph + * the job graph this vertex belongs to + */ + public SimpleOutputVertex(JobGraph jobGraph) { + this(null, jobGraph); + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java index 25bb027..d3ad516 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.runtime.jobgraph.tasks; import org.apache.flink.configuration.Configuration; @@ -24,7 +23,6 @@ import org.apache.flink.runtime.execution.Environment; /** * Abstract base class for every task class in Nephele. - * */ public abstract class AbstractInvokable { @@ -42,7 +40,7 @@ public abstract class AbstractInvokable { * Must be overwritten by the concrete task. This method is called by the task manager * when the actual execution of the task starts. * - * @throws Execution + * @throws Exception * thrown if any exception occurs during the execution of the tasks */ public abstract void invoke() throws Exception; @@ -89,9 +87,9 @@ public abstract class AbstractInvokable { } /** - * Returns the task configuration object which was attached to the original {@link JobVertex}. + * Returns the task configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.AbstractJobVertex}. * - * @return the task configuration object which was attached to the original {@link JobVertex} + * @return the task configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.AbstractJobVertex} */ public final Configuration getTaskConfiguration() { @@ -99,9 +97,9 @@ public abstract class AbstractInvokable { } /** - * Returns the job configuration object which was attached to the original {@link JobGraph}. + * Returns the job configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.JobGraph}. * - * @return the job configuration object which was attached to the original {@link JobGraph} + * @return the job configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.JobGraph} */ public final Configuration getJobConfiguration() { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SelfCrossForwardTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SelfCrossForwardTask.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SelfCrossForwardTask.java index c181b58..7fafeda 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SelfCrossForwardTask.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/SelfCrossForwardTask.java @@ -16,10 +16,8 @@ * limitations under the License. */ - package org.apache.flink.runtime.executiongraph; - import org.apache.flink.core.io.StringRecord; import org.apache.flink.runtime.io.network.api.RecordReader; import org.apache.flink.runtime.io.network.api.RecordWriter; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java index 68f6496..a409222 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastVarsNepheleITCase.java @@ -35,11 +35,11 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.io.network.channels.ChannelType; import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.InputFormatInputVertex; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException; -import org.apache.flink.runtime.jobgraph.JobInputVertex; -import org.apache.flink.runtime.jobgraph.JobOutputVertex; import org.apache.flink.runtime.jobgraph.JobTaskVertex; +import org.apache.flink.runtime.jobgraph.OutputFormatOutputVertex; import org.apache.flink.runtime.operators.CollectorMapDriver; import org.apache.flink.runtime.operators.DriverStrategy; import org.apache.flink.runtime.operators.RegularPactTask; @@ -227,9 +227,9 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase { // ------------------------------------------------------------------------------------------------------------- @SuppressWarnings("unchecked") - private static JobInputVertex createPointsInput(JobGraph jobGraph, String pointsPath, int numSubTasks, TypeSerializerFactory serializer) { + private static InputFormatInputVertex createPointsInput(JobGraph jobGraph, String pointsPath, int numSubTasks, TypeSerializerFactory serializer) { CsvInputFormat pointsInFormat = new CsvInputFormat(' ', LongValue.class, LongValue.class, LongValue.class, LongValue.class); - JobInputVertex pointsInput = JobGraphUtils.createInput(pointsInFormat, pointsPath, "Input[Points]", jobGraph, numSubTasks); + InputFormatInputVertex pointsInput = JobGraphUtils.createInput(pointsInFormat, pointsPath, "Input[Points]", jobGraph, numSubTasks); { TaskConfig taskConfig = new TaskConfig(pointsInput.getConfiguration()); @@ -241,9 +241,9 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase { } @SuppressWarnings("unchecked") - private static JobInputVertex createModelsInput(JobGraph jobGraph, String pointsPath, int numSubTasks, TypeSerializerFactory serializer) { + private static InputFormatInputVertex createModelsInput(JobGraph jobGraph, String pointsPath, int numSubTasks, TypeSerializerFactory serializer) { CsvInputFormat modelsInFormat = new CsvInputFormat(' ', LongValue.class, LongValue.class, LongValue.class, LongValue.class); - JobInputVertex modelsInput = JobGraphUtils.createInput(modelsInFormat, pointsPath, "Input[Models]", jobGraph, numSubTasks); + InputFormatInputVertex modelsInput = JobGraphUtils.createInput(modelsInFormat, pointsPath, "Input[Models]", jobGraph, numSubTasks); { TaskConfig taskConfig = new TaskConfig(modelsInput.getConfiguration()); @@ -278,8 +278,8 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase { return pointsInput; } - private static JobOutputVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks, TypeSerializerFactory serializer) { - JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks); + private static OutputFormatOutputVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks, TypeSerializerFactory serializer) { + OutputFormatOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks); { TaskConfig taskConfig = new TaskConfig(output.getConfiguration()); @@ -308,10 +308,10 @@ public class BroadcastVarsNepheleITCase extends RecordAPITestBase { JobGraph jobGraph = new JobGraph("Distance Builder"); // -- vertices --------------------------------------------------------------------------------------------- - JobInputVertex points = createPointsInput(jobGraph, pointsPath, numSubTasks, serializer); - JobInputVertex models = createModelsInput(jobGraph, centersPath, numSubTasks, serializer); + InputFormatInputVertex points = createPointsInput(jobGraph, pointsPath, numSubTasks, serializer); + InputFormatInputVertex models = createModelsInput(jobGraph, centersPath, numSubTasks, serializer); JobTaskVertex mapper = createMapper(jobGraph, numSubTasks, serializer); - JobOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer); + OutputFormatOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer); // -- edges ------------------------------------------------------------------------------------------------ JobGraphUtils.connect(points, mapper, ChannelType.NETWORK, DistributionPattern.POINTWISE); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java index 77d68f8..4d46b16 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/KMeansIterativeNepheleITCase.java @@ -32,11 +32,12 @@ import org.apache.flink.runtime.iterative.task.IterationHeadPactTask; import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask; import org.apache.flink.runtime.iterative.task.IterationTailPactTask; import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.InputFormatInputVertex; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException; -import org.apache.flink.runtime.jobgraph.JobInputVertex; -import org.apache.flink.runtime.jobgraph.JobOutputVertex; import org.apache.flink.runtime.jobgraph.JobTaskVertex; +import org.apache.flink.runtime.jobgraph.OutputFormatOutputVertex; +import org.apache.flink.runtime.jobgraph.SimpleOutputVertex; import org.apache.flink.runtime.operators.CollectorMapDriver; import org.apache.flink.runtime.operators.DriverStrategy; import org.apache.flink.runtime.operators.GroupReduceDriver; @@ -95,10 +96,10 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase { // Job vertex builder methods // ------------------------------------------------------------------------------------------------------------- - private static JobInputVertex createPointsInput(JobGraph jobGraph, String pointsPath, int numSubTasks, TypeSerializerFactory serializer) { + private static InputFormatInputVertex createPointsInput(JobGraph jobGraph, String pointsPath, int numSubTasks, TypeSerializerFactory serializer) { @SuppressWarnings("unchecked") CsvInputFormat pointsInFormat = new CsvInputFormat('|', IntValue.class, DoubleValue.class, DoubleValue.class, DoubleValue.class); - JobInputVertex pointsInput = JobGraphUtils.createInput(pointsInFormat, pointsPath, "[Points]", jobGraph, numSubTasks); + InputFormatInputVertex pointsInput = JobGraphUtils.createInput(pointsInFormat, pointsPath, "[Points]", jobGraph, numSubTasks); { TaskConfig taskConfig = new TaskConfig(pointsInput.getConfiguration()); taskConfig.addOutputShipStrategy(ShipStrategyType.FORWARD); @@ -116,10 +117,10 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase { return pointsInput; } - private static JobInputVertex createCentersInput(JobGraph jobGraph, String centersPath, int numSubTasks, TypeSerializerFactory serializer) { + private static InputFormatInputVertex createCentersInput(JobGraph jobGraph, String centersPath, int numSubTasks, TypeSerializerFactory serializer) { @SuppressWarnings("unchecked") CsvInputFormat modelsInFormat = new CsvInputFormat('|', IntValue.class, DoubleValue.class, DoubleValue.class, DoubleValue.class); - JobInputVertex modelsInput = JobGraphUtils.createInput(modelsInFormat, centersPath, "[Models]", jobGraph, numSubTasks); + InputFormatInputVertex modelsInput = JobGraphUtils.createInput(modelsInFormat, centersPath, "[Models]", jobGraph, numSubTasks); { TaskConfig taskConfig = new TaskConfig(modelsInput.getConfiguration()); @@ -138,9 +139,9 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase { return modelsInput; } - private static JobOutputVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks, TypeSerializerFactory serializer) { + private static OutputFormatOutputVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks, TypeSerializerFactory serializer) { - JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks); + OutputFormatOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks); { TaskConfig taskConfig = new TaskConfig(output.getConfiguration()); @@ -254,8 +255,8 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase { return tail; } - private static JobOutputVertex createSync(JobGraph jobGraph, int numIterations, int dop) { - JobOutputVertex sync = JobGraphUtils.createSync(jobGraph, dop); + private static SimpleOutputVertex createSync(JobGraph jobGraph, int numIterations, int dop) { + SimpleOutputVertex sync = JobGraphUtils.createSync(jobGraph, dop); TaskConfig syncConfig = new TaskConfig(sync.getConfiguration()); syncConfig.setNumberOfIterations(numIterations); syncConfig.setIterationId(ITERATION_ID); @@ -276,19 +277,19 @@ public class KMeansIterativeNepheleITCase extends RecordAPITestBase { JobGraph jobGraph = new JobGraph("KMeans Iterative"); // -- vertices --------------------------------------------------------------------------------------------- - JobInputVertex points = createPointsInput(jobGraph, pointsPath, numSubTasks, serializer); - JobInputVertex centers = createCentersInput(jobGraph, centersPath, numSubTasks, serializer); + InputFormatInputVertex points = createPointsInput(jobGraph, pointsPath, numSubTasks, serializer); + InputFormatInputVertex centers = createCentersInput(jobGraph, centersPath, numSubTasks, serializer); JobTaskVertex head = createIterationHead(jobGraph, numSubTasks, serializer); JobTaskVertex mapper = createMapper(jobGraph, numSubTasks, serializer, serializer, serializer, int0Comparator); JobTaskVertex reducer = createReducer(jobGraph, numSubTasks, serializer, int0Comparator, serializer); - JobOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", numSubTasks); + SimpleOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", numSubTasks); - JobOutputVertex sync = createSync(jobGraph, numIterations, numSubTasks); + SimpleOutputVertex sync = createSync(jobGraph, numIterations, numSubTasks); - JobOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer); + OutputFormatOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer); // -- edges ------------------------------------------------------------------------------------------------ JobGraphUtils.connect(points, mapper, ChannelType.NETWORK, DistributionPattern.POINTWISE); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java index 4fd22a3..8da4e5c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java @@ -44,11 +44,12 @@ import org.apache.flink.runtime.iterative.task.IterationHeadPactTask; import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask; import org.apache.flink.runtime.iterative.task.IterationTailPactTask; import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.InputFormatInputVertex; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException; -import org.apache.flink.runtime.jobgraph.JobInputVertex; -import org.apache.flink.runtime.jobgraph.JobOutputVertex; import org.apache.flink.runtime.jobgraph.JobTaskVertex; +import org.apache.flink.runtime.jobgraph.OutputFormatOutputVertex; +import org.apache.flink.runtime.jobgraph.SimpleOutputVertex; import org.apache.flink.runtime.operators.BuildSecondCachedMatchDriver; import org.apache.flink.runtime.operators.CollectorMapDriver; import org.apache.flink.runtime.operators.DriverStrategy; @@ -173,12 +174,12 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase { // Invariant vertices across all variants // ----------------------------------------------------------------------------------------------------------------- - private static JobInputVertex createVerticesInput(JobGraph jobGraph, String verticesPath, int numSubTasks, + private static InputFormatInputVertex createVerticesInput(JobGraph jobGraph, String verticesPath, int numSubTasks, TypeSerializerFactory serializer, TypeComparatorFactory comparator) { @SuppressWarnings("unchecked") CsvInputFormat verticesInFormat = new CsvInputFormat(' ', LongValue.class); - JobInputVertex verticesInput = JobGraphUtils.createInput(verticesInFormat, verticesPath, "VerticesInput", + InputFormatInputVertex verticesInput = JobGraphUtils.createInput(verticesInFormat, verticesPath, "VerticesInput", jobGraph, numSubTasks); TaskConfig verticesInputConfig = new TaskConfig(verticesInput.getConfiguration()); { @@ -204,13 +205,13 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase { return verticesInput; } - private static JobInputVertex createEdgesInput(JobGraph jobGraph, String edgesPath, int numSubTasks, + private static InputFormatInputVertex createEdgesInput(JobGraph jobGraph, String edgesPath, int numSubTasks, TypeSerializerFactory serializer, TypeComparatorFactory comparator) { // edges @SuppressWarnings("unchecked") CsvInputFormat edgesInFormat = new CsvInputFormat(' ', LongValue.class, LongValue.class); - JobInputVertex edgesInput = JobGraphUtils.createInput(edgesInFormat, edgesPath, "EdgesInput", jobGraph, + InputFormatInputVertex edgesInput = JobGraphUtils.createInput(edgesInFormat, edgesPath, "EdgesInput", jobGraph, numSubTasks); TaskConfig edgesInputConfig = new TaskConfig(edgesInput.getConfiguration()); { @@ -326,9 +327,9 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase { return intermediate; } - private static JobOutputVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks, + private static OutputFormatOutputVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks, TypeSerializerFactory serializer) { - JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Final Output", numSubTasks); + OutputFormatOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Final Output", numSubTasks); TaskConfig outputConfig = new TaskConfig(output.getConfiguration()); { @@ -351,14 +352,14 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase { return output; } - private static JobOutputVertex createFakeTail(JobGraph jobGraph, int numSubTasks) { - JobOutputVertex fakeTailOutput = + private static SimpleOutputVertex createFakeTail(JobGraph jobGraph, int numSubTasks) { + SimpleOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", numSubTasks); return fakeTailOutput; } - private static JobOutputVertex createSync(JobGraph jobGraph, int numSubTasks, int maxIterations) { - JobOutputVertex sync = JobGraphUtils.createSync(jobGraph, numSubTasks); + private static SimpleOutputVertex createSync(JobGraph jobGraph, int numSubTasks, int maxIterations) { + SimpleOutputVertex sync = JobGraphUtils.createSync(jobGraph, numSubTasks); TaskConfig syncConfig = new TaskConfig(sync.getConfiguration()); syncConfig.setNumberOfIterations(maxIterations); syncConfig.setIterationId(ITERATION_ID); @@ -388,16 +389,16 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase { JobGraph jobGraph = new JobGraph("Connected Components (Unified Tails)"); // -- invariant vertices ----------------------------------------------------------------------------------- - JobInputVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator); - JobInputVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator); + InputFormatInputVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator); + InputFormatInputVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator); JobTaskVertex head = createIterationHead(jobGraph, numSubTasks, serializer, comparator, pairComparator); JobTaskVertex intermediate = createIterationIntermediate(jobGraph, numSubTasks, serializer, comparator); TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration()); - JobOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer); - JobOutputVertex fakeTail = createFakeTail(jobGraph, numSubTasks); - JobOutputVertex sync = createSync(jobGraph, numSubTasks, maxIterations); + OutputFormatOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer); + SimpleOutputVertex fakeTail = createFakeTail(jobGraph, numSubTasks); + SimpleOutputVertex sync = createSync(jobGraph, numSubTasks, maxIterations); // --------------- the tail (solution set join) --------------- JobTaskVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph, @@ -472,8 +473,8 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase { JobGraph jobGraph = new JobGraph("Connected Components (Unified Tails)"); // input - JobInputVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator); - JobInputVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator); + InputFormatInputVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator); + InputFormatInputVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator); // head JobTaskVertex head = createIterationHead(jobGraph, numSubTasks, serializer, comparator, pairComparator); @@ -485,10 +486,10 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase { TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration()); // output and auxiliaries - JobOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer); - JobOutputVertex ssFakeTail = createFakeTail(jobGraph, numSubTasks); - JobOutputVertex wsFakeTail = createFakeTail(jobGraph, numSubTasks); - JobOutputVertex sync = createSync(jobGraph, numSubTasks, maxIterations); + OutputFormatOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer); + SimpleOutputVertex ssFakeTail = createFakeTail(jobGraph, numSubTasks); + SimpleOutputVertex wsFakeTail = createFakeTail(jobGraph, numSubTasks); + SimpleOutputVertex sync = createSync(jobGraph, numSubTasks, maxIterations); // ------------------ the intermediate (ss join) ---------------------- JobTaskVertex ssJoinIntermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class, @@ -623,8 +624,8 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase { JobGraph jobGraph = new JobGraph("Connected Components (Intermediate Workset Update, Solution Set Tail)"); // input - JobInputVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator); - JobInputVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator); + InputFormatInputVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator); + InputFormatInputVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator); // head JobTaskVertex head = createIterationHead(jobGraph, numSubTasks, serializer, comparator, pairComparator); @@ -636,9 +637,9 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase { TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration()); // output and auxiliaries - JobOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer); - JobOutputVertex fakeTail = createFakeTail(jobGraph, numSubTasks); - JobOutputVertex sync = createSync(jobGraph, numSubTasks, maxIterations); + OutputFormatOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer); + SimpleOutputVertex fakeTail = createFakeTail(jobGraph, numSubTasks); + SimpleOutputVertex sync = createSync(jobGraph, numSubTasks, maxIterations); // ------------------ the intermediate (ws update) ---------------------- JobTaskVertex wsUpdateIntermediate = @@ -749,8 +750,8 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase { JobGraph jobGraph = new JobGraph("Connected Components (Intermediate Solution Set Update, Workset Tail)"); // input - JobInputVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator); - JobInputVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator); + InputFormatInputVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator); + InputFormatInputVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator); // head JobTaskVertex head = createIterationHead(jobGraph, numSubTasks, serializer, comparator, pairComparator); @@ -760,9 +761,9 @@ public class ConnectedComponentsNepheleITCase extends RecordAPITestBase { TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration()); // output and auxiliaries - JobOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer); - JobOutputVertex fakeTail = createFakeTail(jobGraph, numSubTasks); - JobOutputVertex sync = createSync(jobGraph, numSubTasks, maxIterations); + OutputFormatOutputVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer); + SimpleOutputVertex fakeTail = createFakeTail(jobGraph, numSubTasks); + SimpleOutputVertex sync = createSync(jobGraph, numSubTasks, maxIterations); // ------------------ the intermediate (ss update) ---------------------- JobTaskVertex ssJoinIntermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class, http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java index 5a6e4f5..8246d22 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java @@ -33,11 +33,12 @@ import org.apache.flink.runtime.io.network.channels.ChannelType; import org.apache.flink.runtime.iterative.task.IterationHeadPactTask; import org.apache.flink.runtime.iterative.task.IterationTailPactTask; import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.InputFormatInputVertex; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException; -import org.apache.flink.runtime.jobgraph.JobInputVertex; -import org.apache.flink.runtime.jobgraph.JobOutputVertex; import org.apache.flink.runtime.jobgraph.JobTaskVertex; +import org.apache.flink.runtime.jobgraph.OutputFormatOutputVertex; +import org.apache.flink.runtime.jobgraph.SimpleOutputVertex; import org.apache.flink.runtime.operators.CollectorMapDriver; import org.apache.flink.runtime.operators.DriverStrategy; import org.apache.flink.runtime.operators.GroupReduceDriver; @@ -130,7 +131,7 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase { // -------------------------------------------------------------------------------------------------------------- // - input ----------------------------------------------------------------------------------------------------- - JobInputVertex input = JobGraphUtils.createInput( + InputFormatInputVertex input = JobGraphUtils.createInput( new PointInFormat(), inputPath, "Input", jobGraph, numSubTasks); TaskConfig inputConfig = new TaskConfig(input.getConfiguration()); { @@ -213,7 +214,7 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase { } // - output ---------------------------------------------------------------------------------------------------- - JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks); + OutputFormatOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks); TaskConfig outputConfig = new TaskConfig(output.getConfiguration()); { outputConfig.addInputToGroup(0); @@ -224,10 +225,10 @@ public class IterationWithChainingNepheleITCase extends RecordAPITestBase { } // - fake tail ------------------------------------------------------------------------------------------------- - JobOutputVertex fakeTail = JobGraphUtils.createFakeOutput(jobGraph, "Fake Tail", numSubTasks); + SimpleOutputVertex fakeTail = JobGraphUtils.createFakeOutput(jobGraph, "Fake Tail", numSubTasks); // - sync ------------------------------------------------------------------------------------------------------ - JobOutputVertex sync = JobGraphUtils.createSync(jobGraph, numSubTasks); + SimpleOutputVertex sync = JobGraphUtils.createSync(jobGraph, numSubTasks); TaskConfig syncConfig = new TaskConfig(sync.getConfiguration()); syncConfig.setNumberOfIterations(maxIterations); syncConfig.setIterationId(ITERATION_ID); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java index 4370111..052c7ea 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java @@ -33,11 +33,12 @@ import org.apache.flink.runtime.iterative.io.FakeOutputTask; import org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask; import org.apache.flink.runtime.jobgraph.AbstractJobVertex; import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.InputFormatInputVertex; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException; -import org.apache.flink.runtime.jobgraph.JobInputVertex; -import org.apache.flink.runtime.jobgraph.JobOutputVertex; import org.apache.flink.runtime.jobgraph.JobTaskVertex; +import org.apache.flink.runtime.jobgraph.OutputFormatOutputVertex; +import org.apache.flink.runtime.jobgraph.SimpleOutputVertex; import org.apache.flink.runtime.operators.DataSinkTask; import org.apache.flink.runtime.operators.DataSourceTask; import org.apache.flink.runtime.operators.RegularPactTask; @@ -55,17 +56,17 @@ public class JobGraphUtils { client.submitJobAndWait(); } - public static > JobInputVertex createInput(T stub, String path, String name, JobGraph graph, + public static > InputFormatInputVertex createInput(T stub, String path, String name, JobGraph graph, int degreeOfParallelism) { stub.setFilePath(path); return createInput(new UserCodeObjectWrapper(stub), name, graph, degreeOfParallelism); } - private static > JobInputVertex createInput(UserCodeWrapper stub, String name, JobGraph graph, + private static > InputFormatInputVertex createInput(UserCodeWrapper stub, String name, JobGraph graph, int degreeOfParallelism) { - JobInputVertex inputVertex = new JobInputVertex(name, graph); + InputFormatInputVertex inputVertex = new InputFormatInputVertex(name, graph); inputVertex.setInvokableClass(DataSourceTask.class); @@ -99,8 +100,8 @@ public class JobGraphUtils { return taskVertex; } - public static JobOutputVertex createSync(JobGraph jobGraph, int degreeOfParallelism) { - JobOutputVertex sync = new JobOutputVertex("BulkIterationSync", jobGraph); + public static SimpleOutputVertex createSync(JobGraph jobGraph, int degreeOfParallelism) { + SimpleOutputVertex sync = new SimpleOutputVertex("BulkIterationSync", jobGraph); sync.setInvokableClass(IterationSynchronizationSinkTask.class); sync.setNumberOfSubtasks(1); TaskConfig syncConfig = new TaskConfig(sync.getConfiguration()); @@ -108,17 +109,17 @@ public class JobGraphUtils { return sync; } - public static JobOutputVertex createFakeOutput(JobGraph jobGraph, String name, int degreeOfParallelism) + public static SimpleOutputVertex createFakeOutput(JobGraph jobGraph, String name, int degreeOfParallelism) { - JobOutputVertex outputVertex = new JobOutputVertex(name, jobGraph); + SimpleOutputVertex outputVertex = new SimpleOutputVertex(name, jobGraph); outputVertex.setInvokableClass(FakeOutputTask.class); outputVertex.setNumberOfSubtasks(degreeOfParallelism); return outputVertex; } - public static JobOutputVertex createFileOutput(JobGraph jobGraph, String name, int degreeOfParallelism) + public static OutputFormatOutputVertex createFileOutput(JobGraph jobGraph, String name, int degreeOfParallelism) { - JobOutputVertex sinkVertex = new JobOutputVertex(name, jobGraph); + OutputFormatOutputVertex sinkVertex = new OutputFormatOutputVertex(name, jobGraph); sinkVertex.setInvokableClass(DataSinkTask.class); sinkVertex.setNumberOfSubtasks(degreeOfParallelism); return sinkVertex; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java index 85cedba..aea2c2c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRank.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.test.iterative.nephele.customdanglingpagerank; import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; @@ -31,10 +30,11 @@ import org.apache.flink.runtime.iterative.task.IterationHeadPactTask; import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask; import org.apache.flink.runtime.iterative.task.IterationTailPactTask; import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.InputFormatInputVertex; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobInputVertex; -import org.apache.flink.runtime.jobgraph.JobOutputVertex; import org.apache.flink.runtime.jobgraph.JobTaskVertex; +import org.apache.flink.runtime.jobgraph.OutputFormatOutputVertex; +import org.apache.flink.runtime.jobgraph.SimpleOutputVertex; import org.apache.flink.runtime.operators.BuildSecondCachedMatchDriver; import org.apache.flink.runtime.operators.CoGroupDriver; import org.apache.flink.runtime.operators.CollectorMapDriver; @@ -138,7 +138,7 @@ public class CustomCompensatableDanglingPageRank { // --------------- the inputs --------------------- // page rank input - JobInputVertex pageWithRankInput = JobGraphUtils.createInput(new CustomImprovedDanglingPageRankInputFormat(), + InputFormatInputVertex pageWithRankInput = JobGraphUtils.createInput(new CustomImprovedDanglingPageRankInputFormat(), pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism); TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration()); pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH); @@ -147,7 +147,7 @@ public class CustomCompensatableDanglingPageRank { pageWithRankInputConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices)); // edges as adjacency list - JobInputVertex adjacencyListInput = JobGraphUtils.createInput(new CustomImprovedAdjacencyListInputFormat(), + InputFormatInputVertex adjacencyListInput = JobGraphUtils.createInput(new CustomImprovedAdjacencyListInputFormat(), adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism); TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration()); adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH); @@ -267,7 +267,7 @@ public class CustomCompensatableDanglingPageRank { // --------------- the output --------------------- - JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism); + OutputFormatOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism); TaskConfig outputConfig = new TaskConfig(output.getConfiguration()); outputConfig.addInputToGroup(0); outputConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0); @@ -276,10 +276,10 @@ public class CustomCompensatableDanglingPageRank { // --------------- the auxiliaries --------------------- - JobOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", + SimpleOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", degreeOfParallelism); - JobOutputVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism); + SimpleOutputVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism); TaskConfig syncConfig = new TaskConfig(sync.getConfiguration()); syncConfig.setNumberOfIterations(numIterations); syncConfig.addIterationAggregator(CustomCompensatableDotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator()); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/cb7039e3/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java index c60f905..a740cf3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.test.iterative.nephele.customdanglingpagerank; import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; @@ -31,10 +30,11 @@ import org.apache.flink.runtime.iterative.task.IterationHeadPactTask; import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask; import org.apache.flink.runtime.iterative.task.IterationTailPactTask; import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.InputFormatInputVertex; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobInputVertex; -import org.apache.flink.runtime.jobgraph.JobOutputVertex; import org.apache.flink.runtime.jobgraph.JobTaskVertex; +import org.apache.flink.runtime.jobgraph.OutputFormatOutputVertex; +import org.apache.flink.runtime.jobgraph.SimpleOutputVertex; import org.apache.flink.runtime.operators.BuildSecondCachedMatchDriver; import org.apache.flink.runtime.operators.CoGroupDriver; import org.apache.flink.runtime.operators.CollectorMapDriver; @@ -138,7 +138,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner { // --------------- the inputs --------------------- // page rank input - JobInputVertex pageWithRankInput = JobGraphUtils.createInput(new CustomImprovedDanglingPageRankInputFormat(), + InputFormatInputVertex pageWithRankInput = JobGraphUtils.createInput(new CustomImprovedDanglingPageRankInputFormat(), pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism); TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration()); pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH); @@ -147,7 +147,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner { pageWithRankInputConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices)); // edges as adjacency list - JobInputVertex adjacencyListInput = JobGraphUtils.createInput(new CustomImprovedAdjacencyListInputFormat(), + InputFormatInputVertex adjacencyListInput = JobGraphUtils.createInput(new CustomImprovedAdjacencyListInputFormat(), adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism); TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration()); adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH); @@ -279,7 +279,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner { // --------------- the output --------------------- - JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism); + OutputFormatOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism); TaskConfig outputConfig = new TaskConfig(output.getConfiguration()); outputConfig.addInputToGroup(0); outputConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0); @@ -288,10 +288,9 @@ public class CustomCompensatableDanglingPageRankWithCombiner { // --------------- the auxiliaries --------------------- - JobOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", - degreeOfParallelism); + SimpleOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", degreeOfParallelism); - JobOutputVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism); + SimpleOutputVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism); TaskConfig syncConfig = new TaskConfig(sync.getConfiguration()); syncConfig.setNumberOfIterations(numIterations); syncConfig.addIterationAggregator(CustomCompensatableDotProductCoGroup.AGGREGATOR_NAME, new PageRankStatsAggregator());