flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [41/63] [abbrv] git commit: Adjusted job graph generator to new job graph classes
Date Sun, 21 Sep 2014 02:13:05 GMT
Adjusted job graph generator to new job graph classes


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/09d1c33c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/09d1c33c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/09d1c33c

Branch: refs/heads/master
Commit: 09d1c33c9016748ee971a1a906b1e2549eb3f3ee
Parents: 25acb6b
Author: Stephan Ewen <sewen@apache.org>
Authored: Fri Sep 12 14:57:54 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Sat Sep 20 20:02:49 2014 +0200

----------------------------------------------------------------------
 .../plantranslate/NepheleJobGraphGenerator.java | 188 ++++++++-----------
 .../runtime/jobgraph/AbstractJobVertex.java     |  15 +-
 2 files changed, 88 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/09d1c33c/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
index ad6bb70..1783e1d 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -56,18 +56,18 @@ import org.apache.flink.compiler.plan.WorksetPlanNode;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.runtime.io.network.channels.ChannelType;
 import org.apache.flink.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion;
 import org.apache.flink.runtime.iterative.io.FakeOutputTask;
 import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
 import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
 import org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask;
 import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
-import org.apache.flink.runtime.jobgraph.AbstractJobOutputVertex;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.InputFormatVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException;
+import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.operators.CoGroupDriver;
 import org.apache.flink.runtime.operators.CoGroupWithSolutionSetFirstDriver;
 import org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver;
@@ -106,8 +106,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 	
 	// ------------------------------------------------------------------------
 
-	private JobGraph jobGraph; // the job that is currently built
-
 	private Map<PlanNode, AbstractJobVertex> vertices; // a map from optimizer nodes to
nephele vertices
 	
 	private Map<PlanNode, TaskInChain> chainedTasks; // a map from optimizer nodes to
nephele vertices
@@ -117,8 +115,6 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 	private List<TaskInChain> chainedTasksInSequence;
 	
 	private List<AbstractJobVertex> auxVertices; // auxiliary vertices which are added
during job graph generation
-
-	private AbstractJobVertex maxDegreeVertex; // the vertex with the highest degree of parallelism
 	
 	private final int defaultMaxFan;
 	
@@ -155,13 +151,11 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 	 * @return JobGraph generated from PACT plan.
 	 */
 	public JobGraph compileJobGraph(OptimizedPlan program) {
-		this.jobGraph = new JobGraph(program.getJobName());
 		this.vertices = new HashMap<PlanNode, AbstractJobVertex>();
 		this.chainedTasks = new HashMap<PlanNode, TaskInChain>();
 		this.chainedTasksInSequence = new ArrayList<TaskInChain>();
 		this.auxVertices = new ArrayList<AbstractJobVertex>();
 		this.iterations = new HashMap<IterationPlanNode, IterationDescriptor>();
-		this.maxDegreeVertex = null;
 		
 		// generate Nephele job graph
 		program.accept(this);
@@ -184,33 +178,36 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 			TaskConfig t = new TaskConfig(tic.getContainingVertex().getConfiguration());
 			t.addChainedTask(tic.getChainedTask(), tic.getTaskConfig(), tic.getTaskName());
 		}
-
+		
+		// create the jobgraph object
+		JobGraph graph = new JobGraph(program.getJobName());
+		graph.setAllowQueuedScheduling(false);
+		
+		// all vertices share the same slot sharing group, for now
+		SlotSharingGroup sharingGroup = new SlotSharingGroup();
+		
+		// add vertices to the graph
 		for (AbstractJobVertex vertex : this.vertices.values()) {
-			if (vertex != this.maxDegreeVertex) {
-				vertex.setVertexToShareInstancesWith(this.maxDegreeVertex);
-			}
+			graph.addVertex(vertex);
+			vertex.setSlotSharingGroup(sharingGroup);
 		}
 		
 		for (AbstractJobVertex vertex : this.auxVertices) {
-			if (vertex != this.maxDegreeVertex) {
-				vertex.setVertexToShareInstancesWith(this.maxDegreeVertex);
-			}
+			graph.addVertex(vertex);
+			vertex.setSlotSharingGroup(sharingGroup);
 		}
-
+		
 		// add registered cache file into job configuration
 		for (Entry<String, DistributedCacheEntry> e : program.getOriginalPactPlan().getCachedFiles())
{
-			DistributedCache.writeFileInfoToConfig(e.getKey(), e.getValue(), this.jobGraph.getJobConfiguration());
+			DistributedCache.writeFileInfoToConfig(e.getKey(), e.getValue(), graph.getJobConfiguration());
 		}
-		JobGraph graph = this.jobGraph;
-
+		
 		// release all references again
-		this.maxDegreeVertex = null;
 		this.vertices = null;
 		this.chainedTasks = null;
 		this.chainedTasksInSequence = null;
 		this.auxVertices = null;
 		this.iterations = null;
-		this.jobGraph = null;
 
 		// return job graph
 		return graph;
@@ -347,12 +344,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 		if (vertex != null) {
 			// set degree of parallelism
 			int pd = node.getDegreeOfParallelism();
-			vertex.setNumberOfSubtasks(pd);
-	
-			// check whether this is the vertex with the highest degree of parallelism
-			if (this.maxDegreeVertex == null || this.maxDegreeVertex.getNumberOfSubtasks() < pd)
{
-				this.maxDegreeVertex = vertex;
-			}
+			vertex.setParallelism(pd);
 			
 			// check whether this vertex is part of an iteration step function
 			if (this.currentIteration != null) {
@@ -651,7 +643,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 			if (inConn.isOnDynamicPath()) {
 				numChannelsDynamicPath++;
 				numDynamicSenderTasksTotal += getNumberOfSendersPerReceiver(pattern,
-					sourceVertex.getNumberOfSubtasks(), targetVertex.getNumberOfSubtasks());
+					sourceVertex.getParallelism(), targetVertex.getParallelism());
 			}
 		}
 		
@@ -702,7 +694,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 	// Methods for creating individual vertices
 	// ------------------------------------------------------------------------
 	
-	private JobTaskVertex createSingleInputVertex(SingleInputPlanNode node) throws CompilerException
{
+	private AbstractJobVertex createSingleInputVertex(SingleInputPlanNode node) throws CompilerException
{
 		final String taskName = node.getNodeName();
 		final DriverStrategy ds = node.getDriverStrategy();
 		
@@ -745,7 +737,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 			}
 		}
 		
-		final JobTaskVertex vertex;
+		final AbstractJobVertex vertex;
 		final TaskConfig config;
 		
 		if (chaining) {
@@ -754,7 +746,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 			this.chainedTasks.put(node, new TaskInChain(ds.getPushChainDriverClass(), config, taskName));
 		} else {
 			// create task vertex
-			vertex = new JobTaskVertex(taskName, this.jobGraph);
+			vertex = new AbstractJobVertex(taskName);
 			vertex.setInvokableClass((this.currentIteration != null && node.isOnDynamicPath())
? IterationIntermediatePactTask.class : RegularPactTask.class);
 			
 			config = new TaskConfig(vertex.getConfiguration());
@@ -776,10 +768,10 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 		return vertex;
 	}
 
-	private JobTaskVertex createDualInputVertex(DualInputPlanNode node) throws CompilerException
{
+	private AbstractJobVertex createDualInputVertex(DualInputPlanNode node) throws CompilerException
{
 		final String taskName = node.getNodeName();
 		final DriverStrategy ds = node.getDriverStrategy();
-		final JobTaskVertex vertex = new JobTaskVertex(taskName, this.jobGraph);
+		final AbstractJobVertex vertex = new AbstractJobVertex(taskName);
 		final TaskConfig config = new TaskConfig(vertex.getConfiguration());
 		vertex.setInvokableClass( (this.currentIteration != null && node.isOnDynamicPath())
? IterationIntermediatePactTask.class : RegularPactTask.class);
 		
@@ -806,7 +798,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 	}
 
 	private InputFormatVertex createDataSourceVertex(SourcePlanNode node) throws CompilerException
{
-		final InputFormatVertex vertex = new InputFormatVertex(node.getNodeName(), this.jobGraph);
+		final InputFormatVertex vertex = new InputFormatVertex(node.getNodeName());
 		final TaskConfig config = new TaskConfig(vertex.getConfiguration());
 
 		vertex.setInvokableClass(DataSourceTask.class);
@@ -819,8 +811,8 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 		return vertex;
 	}
 
-	private AbstractJobOutputVertex createDataSinkVertex(SinkPlanNode node) throws CompilerException
{
-		final OutputFormatVertex vertex = new OutputFormatVertex(node.getNodeName(), this.jobGraph);
+	private AbstractJobVertex createDataSinkVertex(SinkPlanNode node) throws CompilerException
{
+		final OutputFormatVertex vertex = new OutputFormatVertex(node.getNodeName());
 		final TaskConfig config = new TaskConfig(vertex.getConfiguration());
 
 		vertex.setInvokableClass(DataSinkTask.class);
@@ -833,7 +825,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 		return vertex;
 	}
 	
-	private JobTaskVertex createBulkIterationHead(BulkPartialSolutionPlanNode pspn) {
+	private AbstractJobVertex createBulkIterationHead(BulkPartialSolutionPlanNode pspn) {
 		// get the bulk iteration that corresponds to this partial solution node
 		final BulkIterationPlanNode iteration = pspn.getContainingIterationNode();
 		
@@ -864,12 +856,12 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 		}
 		
 		// create or adopt the head vertex
-		final JobTaskVertex toReturn;
-		final JobTaskVertex headVertex;
+		final AbstractJobVertex toReturn;
+		final AbstractJobVertex headVertex;
 		final TaskConfig headConfig;
 		if (merge) {
 			final PlanNode successor = pspn.getOutgoingChannels().get(0).getTarget();
-			headVertex = (JobTaskVertex) this.vertices.get(successor);
+			headVertex = (AbstractJobVertex) this.vertices.get(successor);
 			
 			if (headVertex == null) {
 				throw new CompilerException(
@@ -884,7 +876,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 			// instantiate the head vertex and give it a no-op driver as the driver strategy.
 			// everything else happens in the post visit, after the input (the initial partial solution)
 			// is connected.
-			headVertex = new JobTaskVertex("PartialSolution ("+iteration.getNodeName()+")", this.jobGraph);
+			headVertex = new AbstractJobVertex("PartialSolution ("+iteration.getNodeName()+")");
 			headVertex.setInvokableClass(IterationHeadPactTask.class);
 			headConfig = new TaskConfig(headVertex.getConfiguration());
 			headConfig.setDriver(NoOpDriver.class);
@@ -901,7 +893,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 		return toReturn;
 	}
 	
-	private JobTaskVertex createWorksetIterationHead(WorksetPlanNode wspn) {
+	private AbstractJobVertex createWorksetIterationHead(WorksetPlanNode wspn) {
 		// get the bulk iteration that corresponds to this partial solution node
 		final WorksetIterationPlanNode iteration = wspn.getContainingIterationNode();
 		
@@ -932,12 +924,12 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 		}
 		
 		// create or adopt the head vertex
-		final JobTaskVertex toReturn;
-		final JobTaskVertex headVertex;
+		final AbstractJobVertex toReturn;
+		final AbstractJobVertex headVertex;
 		final TaskConfig headConfig;
 		if (merge) {
 			final PlanNode successor = wspn.getOutgoingChannels().get(0).getTarget();
-			headVertex = (JobTaskVertex) this.vertices.get(successor);
+			headVertex = (AbstractJobVertex) this.vertices.get(successor);
 			
 			if (headVertex == null) {
 				throw new CompilerException(
@@ -952,7 +944,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 			// instantiate the head vertex and give it a no-op driver as the driver strategy.
 			// everything else happens in the post visit, after the input (the initial partial solution)
 			// is connected.
-			headVertex = new JobTaskVertex("IterationHead("+iteration.getNodeName()+")", this.jobGraph);
+			headVertex = new AbstractJobVertex("IterationHead("+iteration.getNodeName()+")");
 			headVertex.setInvokableClass(IterationHeadPactTask.class);
 			headConfig = new TaskConfig(headVertex.getConfiguration());
 			headConfig.setDriver(NoOpDriver.class);
@@ -1002,35 +994,33 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 	 * @param targetVertex
 	 * @param targetConfig
 	 * @param isBroadcast
-	 * @throws JobGraphDefinitionException
 	 * @throws CompilerException
 	 */
 	private DistributionPattern connectJobVertices(Channel channel, int inputNumber,
 			final AbstractJobVertex sourceVertex, final TaskConfig sourceConfig,
 			final AbstractJobVertex targetVertex, final TaskConfig targetConfig, boolean isBroadcast)
-	throws JobGraphDefinitionException, CompilerException
+	throws CompilerException
 	{
 		// ------------ connect the vertices to the job graph --------------
-		final ChannelType channelType;
 		final DistributionPattern distributionPattern;
 
 		switch (channel.getShipStrategy()) {
 			case FORWARD:
 				distributionPattern = DistributionPattern.POINTWISE;
-				channelType = ChannelType.NETWORK;
 				break;
 			case PARTITION_RANDOM:
 			case BROADCAST:
 			case PARTITION_HASH:
 			case PARTITION_RANGE:
 				distributionPattern = DistributionPattern.BIPARTITE;
-				channelType = ChannelType.NETWORK;
 				break;
 			default:
 				throw new RuntimeException("Unknown runtime ship strategy: " + channel.getShipStrategy());
 		}
 		
-		sourceVertex.connectTo(targetVertex, channelType, distributionPattern);
+		targetVertex.connectNewDataSetAsInput(sourceVertex, distributionPattern);
+		
+//		sourceVertex.conn/ectTo(targetVertex, channelType, distributionPattern);
 
 		// -------------- configure the source task's ship strategy strategies in task config --------------
 		final int outputIndex = sourceConfig.getNumOutputs();
@@ -1119,7 +1109,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 	private void finalizeBulkIteration(IterationDescriptor descr) {
 		
 		final BulkIterationPlanNode bulkNode = (BulkIterationPlanNode) descr.getIterationNode();
-		final JobTaskVertex headVertex = descr.getHeadTask();
+		final AbstractJobVertex headVertex = descr.getHeadTask();
 		final TaskConfig headConfig = new TaskConfig(headVertex.getConfiguration());
 		final TaskConfig headFinalOutputConfig = descr.getHeadFinalResultConfig();
 		
@@ -1135,13 +1125,13 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 		headConfig.setRelativeBackChannelMemory(relativeMemForBackChannel);
 		
 		// --------------------------- create the sync task ---------------------------
-		final SimpleOutputVertex sync = new SimpleOutputVertex("Sync(" + bulkNode.getNodeName()
+ ")", this.jobGraph);
+		final AbstractJobVertex sync = new AbstractJobVertex("Sync(" + bulkNode.getNodeName() +
")");
 		sync.setInvokableClass(IterationSynchronizationSinkTask.class);
-		sync.setNumberOfSubtasks(1);
+		sync.setParallelism(1);
 		this.auxVertices.add(sync);
 		
 		final TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
-		syncConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, headVertex.getNumberOfSubtasks());
+		syncConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, headVertex.getParallelism());
 
 		// set the number of iteration / convergence criterion for the sync
 		final int maxNumIterations = bulkNode.getIterationNode().getIterationContract().getMaximumNumberOfIterations();
@@ -1151,12 +1141,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 		syncConfig.setNumberOfIterations(maxNumIterations);
 		
 		// connect the sync task
-		try {
-			headVertex.connectTo(sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
-		} catch (JobGraphDefinitionException e) {
-			throw new CompilerException("Bug: Cannot connect head vertex to sync task.");
-		}
-		
+		sync.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE);
 		
 		// ----------------------------- create the iteration tail ------------------------------
 		
@@ -1164,14 +1149,14 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 		final PlanNode rootOfStepFunction = bulkNode.getRootOfStepFunction();
 		final TaskConfig tailConfig;
 		
-		JobTaskVertex rootOfStepFunctionVertex = (JobTaskVertex) this.vertices.get(rootOfStepFunction);
+		AbstractJobVertex rootOfStepFunctionVertex = (AbstractJobVertex) this.vertices.get(rootOfStepFunction);
 		if (rootOfStepFunctionVertex == null) {
 			// last op is chained
 			final TaskInChain taskInChain = this.chainedTasks.get(rootOfStepFunction);
 			if (taskInChain == null) {
 				throw new CompilerException("Bug: Tail of step function not found as vertex or chained
task.");
 			}
-			rootOfStepFunctionVertex = (JobTaskVertex) taskInChain.getContainingVertex();
+			rootOfStepFunctionVertex = (AbstractJobVertex) taskInChain.getContainingVertex();
 
 			// the fake channel is statically typed to pact record. no data is sent over this channel
anyways.
 			tailConfig = taskInChain.getTaskConfig();
@@ -1190,18 +1175,13 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 			tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
 			
 			// create the fake output task
-			SimpleOutputVertex fakeTail = new SimpleOutputVertex("Fake Tail", this.jobGraph);
+			AbstractJobVertex fakeTail = new AbstractJobVertex("Fake Tail");
 			fakeTail.setInvokableClass(FakeOutputTask.class);
-			fakeTail.setNumberOfSubtasks(headVertex.getNumberOfSubtasks());
+			fakeTail.setParallelism(headVertex.getParallelism());
 			this.auxVertices.add(fakeTail);
 			
 			// connect the fake tail
-			try {
-				rootOfStepFunctionVertex.connectTo(fakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
-			} catch (JobGraphDefinitionException e) {
-				throw new CompilerException("Bug: Cannot connect iteration tail vertex fake tail task");
-			}
-			
+			fakeTail.connectNewDataSetAsInput(rootOfStepFunctionVertex, DistributionPattern.POINTWISE);
 		}
 		
 		
@@ -1209,7 +1189,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 		final TaskConfig tailConfigOfTerminationCriterion;
 		// If we have a termination criterion and it is not an intermediate node
 		if(rootOfTerminationCriterion != null && rootOfTerminationCriterion.getOutgoingChannels().isEmpty())
{
-			JobTaskVertex rootOfTerminationCriterionVertex = (JobTaskVertex) this.vertices.get(rootOfTerminationCriterion);
+			AbstractJobVertex rootOfTerminationCriterionVertex = (AbstractJobVertex) this.vertices.get(rootOfTerminationCriterion);
 			
 			
 			if (rootOfTerminationCriterionVertex == null) {
@@ -1218,7 +1198,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 				if (taskInChain == null) {
 					throw new CompilerException("Bug: Tail of termination criterion not found as vertex
or chained task.");
 				}
-				rootOfTerminationCriterionVertex = (JobTaskVertex) taskInChain.getContainingVertex();
+				rootOfTerminationCriterionVertex = (AbstractJobVertex) taskInChain.getContainingVertex();
 
 				// the fake channel is statically typed to pact record. no data is sent over this channel
anyways.
 				tailConfigOfTerminationCriterion = taskInChain.getTaskConfig();
@@ -1232,17 +1212,13 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 			tailConfigOfTerminationCriterion.setOutputSerializer(bulkNode.getSerializerForIterationChannel());
 			tailConfigOfTerminationCriterion.addOutputShipStrategy(ShipStrategyType.FORWARD);
 			
-			SimpleOutputVertex fakeTailTerminationCriterion = new SimpleOutputVertex("Fake Tail for
Termination Criterion", this.jobGraph);
+			AbstractJobVertex fakeTailTerminationCriterion = new AbstractJobVertex("Fake Tail for
Termination Criterion");
 			fakeTailTerminationCriterion.setInvokableClass(FakeOutputTask.class);
-			fakeTailTerminationCriterion.setNumberOfSubtasks(headVertex.getNumberOfSubtasks());
+			fakeTailTerminationCriterion.setParallelism(headVertex.getParallelism());
 			this.auxVertices.add(fakeTailTerminationCriterion);
 		
 			// connect the fake tail
-			try {
-				rootOfTerminationCriterionVertex.connectTo(fakeTailTerminationCriterion, ChannelType.IN_MEMORY,
DistributionPattern.POINTWISE);
-			} catch (JobGraphDefinitionException e) {
-				throw new CompilerException("Bug: Cannot connect iteration tail vertex fake tail task
for termination criterion");
-			}
+			fakeTailTerminationCriterion.connectNewDataSetAsInput(rootOfTerminationCriterionVertex,
DistributionPattern.POINTWISE);
 			
 			// tell the head that it needs to wait for the solution set updates
 			headConfig.setWaitForSolutionSetUpdate();
@@ -1272,7 +1248,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 	
 	private void finalizeWorksetIteration(IterationDescriptor descr) {
 		final WorksetIterationPlanNode iterNode = (WorksetIterationPlanNode) descr.getIterationNode();
-		final JobTaskVertex headVertex = descr.getHeadTask();
+		final AbstractJobVertex headVertex = descr.getHeadTask();
 		final TaskConfig headConfig = new TaskConfig(headVertex.getConfiguration());
 		final TaskConfig headFinalOutputConfig = descr.getHeadFinalResultConfig();
 		
@@ -1299,13 +1275,13 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 		// --------------------------- create the sync task ---------------------------
 		final TaskConfig syncConfig;
 		{
-			final SimpleOutputVertex sync = new SimpleOutputVertex("Sync (" + iterNode.getNodeName()
+ ")", this.jobGraph);
+			final AbstractJobVertex sync = new AbstractJobVertex("Sync (" + iterNode.getNodeName()
+ ")");
 			sync.setInvokableClass(IterationSynchronizationSinkTask.class);
-			sync.setNumberOfSubtasks(1);
+			sync.setParallelism(1);
 			this.auxVertices.add(sync);
 			
 			syncConfig = new TaskConfig(sync.getConfiguration());
-			syncConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, headVertex.getNumberOfSubtasks());
+			syncConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, headVertex.getParallelism());
 	
 			// set the number of iteration / convergence criterion for the sync
 			final int maxNumIterations = iterNode.getIterationNode().getIterationContract().getMaximumNumberOfIterations();
@@ -1315,11 +1291,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 			syncConfig.setNumberOfIterations(maxNumIterations);
 			
 			// connect the sync task
-			try {
-				headVertex.connectTo(sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
-			} catch (JobGraphDefinitionException e) {
-				throw new CompilerException("Bug: Cannot connect head vertex to sync task.");
-			}
+			sync.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE);
 		}
 		
 		// ----------------------------- create the iteration tails -----------------------------
@@ -1340,14 +1312,14 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 			{
 				// get the vertex for the workset update
 				final TaskConfig worksetTailConfig;
-				JobTaskVertex nextWorksetVertex = (JobTaskVertex) this.vertices.get(nextWorksetNode);
+				AbstractJobVertex nextWorksetVertex = (AbstractJobVertex) this.vertices.get(nextWorksetNode);
 				if (nextWorksetVertex == null) {
 					// nextWorksetVertex is chained
 					TaskInChain taskInChain = this.chainedTasks.get(nextWorksetNode);
 					if (taskInChain == null) {
 						throw new CompilerException("Bug: Next workset node not found as vertex or chained
task.");
 					}
-					nextWorksetVertex = (JobTaskVertex) taskInChain.getContainingVertex();
+					nextWorksetVertex = (AbstractJobVertex) taskInChain.getContainingVertex();
 					worksetTailConfig = taskInChain.getTaskConfig();
 				} else {
 					worksetTailConfig = new TaskConfig(nextWorksetVertex.getConfiguration());
@@ -1364,29 +1336,25 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 					worksetTailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
 					
 					// create the fake output task
-					SimpleOutputVertex fakeTail = new SimpleOutputVertex("Fake Tail", this.jobGraph);
+					AbstractJobVertex fakeTail = new AbstractJobVertex("Fake Tail");
 					fakeTail.setInvokableClass(FakeOutputTask.class);
-					fakeTail.setNumberOfSubtasks(headVertex.getNumberOfSubtasks());
+					fakeTail.setParallelism(headVertex.getParallelism());
 					this.auxVertices.add(fakeTail);
 					
 					// connect the fake tail
-					try {
-						nextWorksetVertex.connectTo(fakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
-					} catch (JobGraphDefinitionException e) {
-						throw new CompilerException("Bug: Cannot connect iteration tail vertex fake tail task");
-					}
+					fakeTail.connectNewDataSetAsInput(nextWorksetVertex, DistributionPattern.POINTWISE);
 				}
 			}
 			{
 				final TaskConfig solutionDeltaConfig;
-				JobTaskVertex solutionDeltaVertex = (JobTaskVertex) this.vertices.get(solutionDeltaNode);
+				AbstractJobVertex solutionDeltaVertex = (AbstractJobVertex) this.vertices.get(solutionDeltaNode);
 				if (solutionDeltaVertex == null) {
 					// last op is chained
 					TaskInChain taskInChain = this.chainedTasks.get(solutionDeltaNode);
 					if (taskInChain == null) {
 						throw new CompilerException("Bug: Solution Set Delta not found as vertex or chained
task.");
 					}
-					solutionDeltaVertex = (JobTaskVertex) taskInChain.getContainingVertex();
+					solutionDeltaVertex = (AbstractJobVertex) taskInChain.getContainingVertex();
 					solutionDeltaConfig = taskInChain.getTaskConfig();
 				} else {
 					solutionDeltaConfig = new TaskConfig(solutionDeltaVertex.getConfiguration());
@@ -1402,17 +1370,13 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 					solutionDeltaConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
 	
 					// create the fake output task
-					SimpleOutputVertex fakeTail = new SimpleOutputVertex("Fake Tail", this.jobGraph);
+					AbstractJobVertex fakeTail = new AbstractJobVertex("Fake Tail");
 					fakeTail.setInvokableClass(FakeOutputTask.class);
-					fakeTail.setNumberOfSubtasks(headVertex.getNumberOfSubtasks());
+					fakeTail.setParallelism(headVertex.getParallelism());
 					this.auxVertices.add(fakeTail);
 					
 					// connect the fake tail
-					try {
-						solutionDeltaVertex.connectTo(fakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
-					} catch (JobGraphDefinitionException e) {
-						throw new CompilerException("Bug: Cannot connect iteration tail vertex fake tail task");
-					}
+					fakeTail.connectNewDataSetAsInput(solutionDeltaVertex, DistributionPattern.POINTWISE);
 					
 					// tell the head that it needs to wait for the solution set updates
 					headConfig.setWaitForSolutionSetUpdate();
@@ -1503,7 +1467,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 		
 		private final IterationPlanNode iterationNode;
 		
-		private JobTaskVertex headTask;
+		private AbstractJobVertex headTask;
 		
 		private TaskConfig headConfig;
 		
@@ -1520,7 +1484,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 			return iterationNode;
 		}
 		
-		public void setHeadTask(JobTaskVertex headTask, TaskConfig headConfig) {
+		public void setHeadTask(AbstractJobVertex headTask, TaskConfig headConfig) {
 			this.headTask = headTask;
 			this.headFinalResultConfig = new TaskConfig(new Configuration());
 			
@@ -1532,7 +1496,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode>
{
 			this.headConfig = headConfig;
 		}
 		
-		public JobTaskVertex getHeadTask() {
+		public AbstractJobVertex getHeadTask() {
 			return headTask;
 		}
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/09d1c33c/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
index cc2cbd8..82823b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
@@ -44,9 +44,6 @@ public class AbstractJobVertex implements java.io.Serializable {
 	/** The ID of the vertex. */
 	private final JobVertexID id;
 
-	/** The name of the vertex */
-	private final String name;
-
 	/** List of produced data sets, one per writer */
 	private final ArrayList<IntermediateDataSet> results = new ArrayList<IntermediateDataSet>();
 
@@ -67,6 +64,9 @@ public class AbstractJobVertex implements java.io.Serializable {
 	
 	/** Optionally, a sharing group that allows subtasks from different job vertices to run
concurrently in one slot */
 	private SlotSharingGroup slotSharingGroup;
+	
+	/** The name of the vertex */
+	private String name;
 
 	// --------------------------------------------------------------------------------------------
 
@@ -109,6 +109,15 @@ public class AbstractJobVertex implements java.io.Serializable {
 	public String getName() {
 		return this.name;
 	}
+	
+	/**
+	 * Sets the name of the vertex
+	 * 
+	 * @param name The new name.
+	 */
+	public void setName(String name) {
+		this.name = name == null ? DEFAULT_NAME : name;
+	}
 
 	/**
 	 * Returns the number of produced intermediate data sets.


Mime
View raw message