flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [04/51] [abbrv] flink git commit: [FLINK-2415] [optimizer] Create and attach proper JobGraph describing JSON plans to JobGraph for batch jobs.
Date Thu, 17 Sep 2015 18:19:36 GMT
[FLINK-2415] [optimizer] Create and attach proper JobGraph describing JSON plans to JobGraph for batch jobs.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff28981f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff28981f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff28981f

Branch: refs/heads/master
Commit: ff28981f97cc434fb00ff7c931004888a84ad6c6
Parents: b6f52df
Author: Stephan Ewen <sewen@apache.org>
Authored: Tue Aug 18 11:04:48 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Sep 17 14:21:49 2015 +0200

----------------------------------------------------------------------
 .../operators/base/BulkIterationBase.java       |   2 +-
 .../operators/base/DeltaIterationBase.java      |   6 +-
 .../flink/optimizer/dag/BinaryUnionNode.java    |   2 +-
 .../flink/optimizer/dag/BulkIterationNode.java  |   4 +-
 .../optimizer/dag/BulkPartialSolutionNode.java  |   2 +-
 .../apache/flink/optimizer/dag/CoGroupNode.java |   2 +-
 .../flink/optimizer/dag/CoGroupRawNode.java     |   2 +-
 .../flink/optimizer/dag/CollectorMapNode.java   |   2 +-
 .../apache/flink/optimizer/dag/CrossNode.java   |   2 +-
 .../flink/optimizer/dag/DagConnection.java      |   4 +-
 .../flink/optimizer/dag/DataSinkNode.java       |   2 +-
 .../flink/optimizer/dag/DataSourceNode.java     |   2 +-
 .../apache/flink/optimizer/dag/FilterNode.java  |   2 +-
 .../apache/flink/optimizer/dag/FlatMapNode.java |   2 +-
 .../flink/optimizer/dag/GroupCombineNode.java   |   2 +-
 .../flink/optimizer/dag/GroupReduceNode.java    |  17 +-
 .../apache/flink/optimizer/dag/JoinNode.java    |   2 +-
 .../org/apache/flink/optimizer/dag/MapNode.java |   2 +-
 .../flink/optimizer/dag/MapPartitionNode.java   |   2 +-
 .../apache/flink/optimizer/dag/MatchNode.java   |   2 +-
 .../flink/optimizer/dag/OptimizerNode.java      |   4 +-
 .../flink/optimizer/dag/PartitionNode.java      |   2 +-
 .../apache/flink/optimizer/dag/ReduceNode.java  |   2 +-
 .../apache/flink/optimizer/dag/SinkJoiner.java  |   2 +-
 .../flink/optimizer/dag/SolutionSetNode.java    |   2 +-
 .../flink/optimizer/dag/SortPartitionNode.java  |   2 +-
 .../flink/optimizer/dag/UnaryOperatorNode.java  |   2 +-
 .../optimizer/dag/WorksetIterationNode.java     |   6 +-
 .../apache/flink/optimizer/dag/WorksetNode.java |   2 +-
 .../apache/flink/optimizer/plan/PlanNode.java   |   2 +-
 .../flink/optimizer/plan/SinkPlanNode.java      |   5 +-
 .../flink/optimizer/plan/SourcePlanNode.java    |   1 -
 .../plandump/PlanJSONDumpGenerator.java         |  16 +-
 .../plantranslate/JobGraphGenerator.java        | 227 +++++++++---
 .../optimizer/plantranslate/JsonMapper.java     | 299 ++++++++++++++++
 .../app/partials/jobs/job.plan.node.jade        |  23 +-
 .../app/scripts/modules/jobs/jobs.dir.coffee    |  21 +-
 flink-runtime-web/web-dashboard/web/js/index.js |  74 ++--
 .../web-dashboard/web/js/vendor.js              | 352 +++++++------------
 .../web/partials/jobs/job.plan.node.html        |  22 +-
 .../apache/flink/runtime/jobgraph/JobEdge.java  |  74 +++-
 .../apache/flink/runtime/jobgraph/JobGraph.java |  11 -
 .../flink/runtime/jobgraph/JobVertex.java       |  64 +++-
 .../jobgraph/jsonplan/JsonPlanGenerator.java    | 142 ++++++++
 .../flink/runtime/operators/DriverStrategy.java |   8 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  11 +-
 .../jobgraph/jsonplan/JsonGeneratorTest.java    | 119 +++++++
 .../jsonplan/JsonJobGraphGenerationTest.java    | 352 +++++++++++++++++++
 48 files changed, 1489 insertions(+), 421 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
index 6304197..4fd4de6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/BulkIterationBase.java
@@ -50,7 +50,7 @@ import org.apache.flink.util.Visitor;
  */
 public class BulkIterationBase<T> extends SingleInputOperator<T, T, AbstractRichFunction> implements IterationOperator {
 	
-	private static String DEFAULT_NAME = "<Unnamed Bulk Iteration>";
+	private static final String DEFAULT_NAME = "<Unnamed Bulk Iteration>";
 	
 	public static final String TERMINATION_CRITERION_AGGREGATOR_NAME = "terminationCriterion.aggregator";
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
index 2986534..9a7674c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/DeltaIterationBase.java
@@ -84,7 +84,7 @@ public class DeltaIterationBase<ST, WT> extends DualInputOperator<ST, WT, ST, Ab
 	}
 
 	public DeltaIterationBase(BinaryOperatorInformation<ST, WT, ST> operatorInfo, int[] keyPositions) {
-		this(operatorInfo, keyPositions, "<Unnamed Workset-Iteration>");
+		this(operatorInfo, keyPositions, "<Unnamed Delta Iteration>");
 	}
 
 	public DeltaIterationBase(BinaryOperatorInformation<ST, WT, ST> operatorInfo, int keyPosition, String name) {
@@ -283,7 +283,7 @@ public class DeltaIterationBase<ST, WT> extends DualInputOperator<ST, WT, ST, Ab
 		private final DeltaIterationBase<?, WT> containingIteration;
 
 		public WorksetPlaceHolder(DeltaIterationBase<?, WT> container, OperatorInformation<WT> operatorInfo) {
-			super(operatorInfo, "Workset Place Holder");
+			super(operatorInfo, "Workset");
 			this.containingIteration = container;
 		}
 
@@ -312,7 +312,7 @@ public class DeltaIterationBase<ST, WT> extends DualInputOperator<ST, WT, ST, Ab
 		protected final DeltaIterationBase<ST, ?> containingIteration;
 
 		public SolutionSetPlaceHolder(DeltaIterationBase<ST, ?> container, OperatorInformation<ST> operatorInfo) {
-			super(operatorInfo, "Solution Set Place Holder");
+			super(operatorInfo, "Solution Set");
 			this.containingIteration = container;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
index 1600a50..fdd76a8 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
@@ -54,7 +54,7 @@ public class BinaryUnionNode extends TwoInputNode {
 	}
 
 	@Override
-	public String getName() {
+	public String getOperatorName() {
 		return "Union";
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
index 5dd868e..3d95c22 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
@@ -184,7 +184,7 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
 	// --------------------------------------------------------------------------------------------
 	
 	@Override
-	public String getName() {
+	public String getOperatorName() {
 		return "Bulk Iteration";
 	}
 
@@ -352,7 +352,7 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
 		// 5) Create a candidate for the Iteration Node for every remaining plan of the step function.
 		if (terminationCriterion == null) {
 			for (PlanNode candidate : candidates) {
-				BulkIterationPlanNode node = new BulkIterationPlanNode(this, "BulkIteration ("+this.getOperator().getName()+")", in, pspn, candidate);
+				BulkIterationPlanNode node = new BulkIterationPlanNode(this, this.getOperator().getName(), in, pspn, candidate);
 				GlobalProperties gProps = candidate.getGlobalProperties().clone();
 				LocalProperties lProps = candidate.getLocalProperties().clone();
 				node.initProperties(gProps, lProps);

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java
index 25a7eef..0a02dd1 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java
@@ -84,7 +84,7 @@ public class BulkPartialSolutionNode extends AbstractPartialSolutionNode {
 	}
 
 	@Override
-	public String getName() {
+	public String getOperatorName() {
 		return "Bulk Partial Solution";
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
index 20bad0d..58bce4b 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
@@ -59,7 +59,7 @@ public class CoGroupNode extends TwoInputNode {
 	}
 
 	@Override
-	public String getName() {
+	public String getOperatorName() {
 		return "CoGroup";
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupRawNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupRawNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupRawNode.java
index 971d244..dcc3102 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupRawNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupRawNode.java
@@ -48,7 +48,7 @@ public class CoGroupRawNode extends TwoInputNode {
 	}
 
 	@Override
-	public String getName() {
+	public String getOperatorName() {
 		return "CoGroup";
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java
index 93be1e4..9c1bcd3 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java
@@ -42,7 +42,7 @@ public class CollectorMapNode extends SingleInputNode {
 	}
 
 	@Override
-	public String getName() {
+	public String getOperatorName() {
 		return "Map";
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java
index 8de67e8..622bfac 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java
@@ -104,7 +104,7 @@ public class CrossNode extends TwoInputNode {
 	}
 
 	@Override
-	public String getName() {
+	public String getOperatorName() {
 		return "Cross";
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java
index 4e65976..1f98a11 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java
@@ -267,7 +267,7 @@ public class DagConnection implements EstimateProvider, DumpableConnection<Optim
 			buf.append("null");
 		} else {
 			buf.append(this.source.getOperator().getName());
-			buf.append('(').append(this.source.getName()).append(')');
+			buf.append('(').append(this.source.getOperatorName()).append(')');
 		}
 
 		buf.append(" -> ");
@@ -282,7 +282,7 @@ public class DagConnection implements EstimateProvider, DumpableConnection<Optim
 			buf.append("null");
 		} else {
 			buf.append(this.target.getOperator().getName());
-			buf.append('(').append(this.target.getName()).append(')');
+			buf.append('(').append(this.target.getOperatorName()).append(')');
 		}
 
 		return buf.toString();

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
index 6ca1149..b35613f 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
@@ -92,7 +92,7 @@ public class DataSinkNode extends OptimizerNode {
 	}
 
 	@Override
-	public String getName() {
+	public String getOperatorName() {
 		return "Data Sink";
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
index 6010f6a..867698d 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
@@ -110,7 +110,7 @@ public class DataSourceNode extends OptimizerNode {
 	}
 
 	@Override
-	public String getName() {
+	public String getOperatorName() {
 		return "Data Source";
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java
index 118ddc8..cb4e7bd 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java
@@ -46,7 +46,7 @@ public class FilterNode extends SingleInputNode {
 	}
 
 	@Override
-	public String getName() {
+	public String getOperatorName() {
 		return "Filter";
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java
index f713d56..7dd84b2 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java
@@ -45,7 +45,7 @@ public class FlatMapNode extends SingleInputNode {
 	}
 
 	@Override
-	public String getName() {
+	public String getOperatorName() {
 		return "FlatMap";
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
index 766d6af..7a1dd34 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
@@ -83,7 +83,7 @@ public class GroupCombineNode extends SingleInputNode {
 	}
 
 	@Override
-	public String getName() {
+	public String getOperatorName() {
 		return "GroupCombine";
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
index 51da36b..bd118ec 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
@@ -24,7 +24,6 @@ import java.util.List;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.SingleInputOperator;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.optimizer.CompilerException;
@@ -45,6 +44,8 @@ public class GroupReduceNode extends SingleInputNode {
 	
 	private final List<OperatorDescriptorSingle> possibleProperties;
 	
+	private final String operatorName;
+	
 	private GroupReduceNode combinerUtilityNode;
 	
 	/**
@@ -54,6 +55,7 @@ public class GroupReduceNode extends SingleInputNode {
 	 */
 	public GroupReduceNode(GroupReduceOperatorBase<?, ?, ?> operator) {
 		super(operator);
+		this.operatorName = "GroupReduce";
 		
 		if (this.keys == null) {
 			// case of a key-less reducer. force a parallelism of 1
@@ -63,8 +65,9 @@ public class GroupReduceNode extends SingleInputNode {
 		this.possibleProperties = initPossibleProperties(operator.getCustomPartitioner());
 	}
 	
-	public GroupReduceNode(GroupReduceNode reducerToCopyForCombiner) {
+	private GroupReduceNode(GroupReduceNode reducerToCopyForCombiner) {
 		super(reducerToCopyForCombiner);
+		this.operatorName = "GroupCombine";
 		
 		this.possibleProperties = Collections.emptyList();
 	}
@@ -95,7 +98,7 @@ public class GroupReduceNode extends SingleInputNode {
 		// check if we can work with a grouping (simple reducer), or if we need ordering because of a group order
 		Ordering groupOrder = null;
 		if (getOperator() instanceof GroupReduceOperatorBase) {
-			groupOrder = ((GroupReduceOperatorBase<?, ?, ?>) getOperator()).getGroupOrder();
+			groupOrder = getOperator().getGroupOrder();
 			if (groupOrder != null && groupOrder.getNumberOfFields() == 0) {
 				groupOrder = null;
 			}
@@ -131,8 +134,8 @@ public class GroupReduceNode extends SingleInputNode {
 	}
 
 	@Override
-	public String getName() {
-		return "GroupReduce";
+	public String getOperatorName() {
+		return this.operatorName;
 	}
 	
 	@Override
@@ -142,10 +145,8 @@ public class GroupReduceNode extends SingleInputNode {
 
 	@Override
 	protected SemanticProperties getSemanticPropertiesForLocalPropertyFiltering() {
-
 		// Local properties for GroupReduce may only be preserved on key fields.
-		SingleInputSemanticProperties origProps =
-				((SingleInputOperator<?,?,?>) getOperator()).getSemanticProperties();
+		SingleInputSemanticProperties origProps = getOperator().getSemanticProperties();
 		SingleInputSemanticProperties filteredProps = new SingleInputSemanticProperties();
 		FieldSet readSet = origProps.getReadFields(0);
 		if(readSet != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
index cbd58ca..02c9b5b 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
@@ -67,7 +67,7 @@ public class JoinNode extends TwoInputNode {
 	}
 
 	@Override
-	public String getName() {
+	public String getOperatorName() {
 		return "Join";
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapNode.java
index 35def59..afbf4a8 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapNode.java
@@ -45,7 +45,7 @@ public class MapNode extends SingleInputNode {
 	}
 
 	@Override
-	public String getName() {
+	public String getOperatorName() {
 		return "Map";
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java
index 6914c15..575dd63 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MapPartitionNode.java
@@ -49,7 +49,7 @@ public class MapPartitionNode extends SingleInputNode {
 	}
 
 	@Override
-	public String getName() {
+	public String getOperatorName() {
 		return "MapPartition";
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java
index de3cd22..ee8ab05 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java
@@ -63,7 +63,7 @@ public class MatchNode extends TwoInputNode {
 	}
 
 	@Override
-	public String getName() {
+	public String getOperatorName() {
 		return "Join";
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
index 9688bb8..490c304 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
@@ -152,7 +152,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 	 * 
 	 * @return The node name.
 	 */
-	public abstract String getName();
+	public abstract String getOperatorName();
 
 	/**
 	 * This function connects the predecessors to this operator.
@@ -1119,7 +1119,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 	public String toString() {
 		StringBuilder bld = new StringBuilder();
 
-		bld.append(getName());
+		bld.append(getOperatorName());
 		bld.append(" (").append(getOperator().getName()).append(") ");
 
 		int i = 1; 

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java
index 5c811b0..33383cb 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java
@@ -59,7 +59,7 @@ public class PartitionNode extends SingleInputNode {
 	}
 
 	@Override
-	public String getName() {
+	public String getOperatorName() {
 		return "Partition";
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
index 52bfb6a..ed010bb 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
@@ -67,7 +67,7 @@ public class ReduceNode extends SingleInputNode {
 	}
 
 	@Override
-	public String getName() {
+	public String getOperatorName() {
 		return "Reduce";
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
index 06606f0..979d254 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
@@ -51,7 +51,7 @@ public class SinkJoiner extends TwoInputNode {
 	}
 	
 	@Override
-	public String getName() {
+	public String getOperatorName() {
 		return "Internal Utility Node";
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java
index 1292cf5..9b53999 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java
@@ -79,7 +79,7 @@ public class SolutionSetNode extends AbstractPartialSolutionNode {
 	}
 
 	@Override
-	public String getName() {
+	public String getOperatorName() {
 		return "Solution Set";
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java
index 83bc39a..8037533 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java
@@ -56,7 +56,7 @@ public class SortPartitionNode extends SingleInputNode {
 	}
 
 	@Override
-	public String getName() {
+	public String getOperatorName() {
 		return "Sort-Partition";
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java
index 45ecdac..0c48033 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/UnaryOperatorNode.java
@@ -53,7 +53,7 @@ public class UnaryOperatorNode extends SingleInputNode {
 	}
 
 	@Override
-	public String getName() {
+	public String getOperatorName() {
 		return this.name;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
index 99c868c..15b9a50 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
@@ -222,7 +222,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 	// --------------------------------------------------------------------------------------------
 	
 	@Override
-	public String getName() {
+	public String getOperatorName() {
 		return "Workset Iteration";
 	}
 
@@ -454,7 +454,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 					}
 					
 					WorksetIterationPlanNode wsNode = new WorksetIterationPlanNode(this,
-							"WorksetIteration ("+this.getOperator().getName()+")", solutionSetIn,
+							this.getOperator().getName(), solutionSetIn,
 							worksetIn, sspn, wspn, worksetCandidate, solutionSetCandidate);
 					wsNode.setImmediateSolutionSetUpdate(immediateDeltaUpdate);
 					wsNode.initProperties(gp, lp);
@@ -572,7 +572,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 		}
 		
 		@Override
-		public String getName() {
+		public String getOperatorName() {
 			return "Internal Utility Node";
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java
index 3b05aba..ae636c5 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java
@@ -84,7 +84,7 @@ public class WorksetNode extends AbstractPartialSolutionNode {
 	}
 
 	@Override
-	public String getName() {
+	public String getOperatorName() {
 		return "Workset";
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
index 6f634fb..9505a57 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
@@ -513,7 +513,7 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan
 
 	@Override
 	public String toString() {
-		return this.template.getName() + " \"" + getProgramOperator().getName() + "\" : " + this.driverStrategy +
+		return this.template.getOperatorName() + " \"" + getProgramOperator().getName() + "\" : " + this.driverStrategy +
 				" [[ " + this.globalProps + " ]] [[ " + this.localProps + " ]]";
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java
index 656e67f..c28a5c3 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.optimizer.plan;
 
 import org.apache.flink.optimizer.dag.DataSinkNode;
@@ -25,8 +24,8 @@ import org.apache.flink.runtime.operators.DriverStrategy;
 /**
  * Plan candidate node for data flow sinks.
  */
-public class SinkPlanNode extends SingleInputPlanNode
-{
+public class SinkPlanNode extends SingleInputPlanNode {
+	
 	/**
 	 * Constructs a new sink candidate node that uses <i>NONE</i> as its local strategy. Note that
 	 * local sorting and range partitioning are handled by the incoming channel already.

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java
index 11b7cc9..937fe71 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.optimizer.plan;
 
 import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
index dc99fd7..e248b0b 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
@@ -39,7 +39,6 @@ import org.apache.flink.optimizer.dag.BulkIterationNode;
 import org.apache.flink.optimizer.dag.DataSinkNode;
 import org.apache.flink.optimizer.dag.DataSourceNode;
 import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.optimizer.dag.DagConnection;
 import org.apache.flink.optimizer.dag.TempMode;
 import org.apache.flink.optimizer.dag.WorksetIterationNode;
 import org.apache.flink.optimizer.dataproperties.GlobalProperties;
@@ -56,9 +55,7 @@ import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.util.StringUtils;
 
-/**
- * 
- */
+
 public class PlanJSONDumpGenerator {
 	
 	private Map<DumpableNode<?>, Integer> nodeIds; // resolves pact nodes to ids
@@ -260,7 +257,7 @@ public class PlanJSONDumpGenerator {
 		}
 		
 		
-		String name = n.getName();
+		String name = n.getOperatorName();
 		if (name.equals("Reduce") && (node instanceof SingleInputPlanNode) && 
 				((SingleInputPlanNode) node).getDriverStrategy() == DriverStrategy.SORTED_GROUP_COMBINE) {
 			name = "Combine";
@@ -309,8 +306,9 @@ public class PlanJSONDumpGenerator {
 				}
 				// output shipping strategy and channel type
 				final Channel channel = (inConn instanceof Channel) ? (Channel) inConn : null; 
-				final ShipStrategyType shipType = channel != null ? channel.getShipStrategy() :
-						((DagConnection) inConn).getShipStrategy();
+				final ShipStrategyType shipType = channel != null ? 
+						channel.getShipStrategy() :
+						inConn.getShipStrategy();
 					
 				String shipStrategy = null;
 				if (shipType != null) {
@@ -623,11 +621,11 @@ public class PlanJSONDumpGenerator {
 		writer.print("\" }");
 	}
 
-	public static final String formatNumber(double number) {
+	public static String formatNumber(double number) {
 		return formatNumber(number, "");
 	}
 
-	public static final String formatNumber(double number, String suffix) {
+	public static String formatNumber(double number, String suffix) {
 		if (number <= 0.0) {
 			return String.valueOf(number);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 943ec2e..0cbcea8 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.optimizer.plantranslate;
 
+import com.fasterxml.jackson.core.JsonFactory;
+
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.aggregators.AggregatorRegistry;
 import org.apache.flink.api.common.aggregators.AggregatorWithName;
@@ -48,7 +50,7 @@ import org.apache.flink.optimizer.plan.WorksetPlanNode;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.optimizer.util.Utils;
 import org.apache.flink.runtime.io.network.DataExchangeMode;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion;
@@ -56,6 +58,7 @@ import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
 import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
 import org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask;
 import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
+import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.InputFormatVertex;
@@ -78,6 +81,7 @@ import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.StringUtils;
 import org.apache.flink.util.Visitor;
 
 import java.io.IOException;
@@ -107,7 +111,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 	
 	private static final boolean mergeIterationAuxTasks = GlobalConfiguration.getBoolean(MERGE_ITERATION_AUX_TASKS_KEY, false);
 	
-	private static final TaskInChain ALREADY_VISITED_PLACEHOLDER = new TaskInChain(null, null, null);
+	private static final TaskInChain ALREADY_VISITED_PLACEHOLDER = new TaskInChain(null, null, null, null);
 	
 	// ------------------------------------------------------------------------
 
@@ -193,21 +197,27 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 			t.addChainedTask(tic.getChainedTask(), tic.getTaskConfig(), tic.getTaskName());
 		}
 		
+		// ----- attach the additional info to the job vertices, for display in the runtime monitor
+		
+		attachOperatorNamesAndDescriptions();
+
+		// ----------- finalize the job graph -----------
+		
 		// create the job graph object
 		JobGraph graph = new JobGraph(program.getJobName());
 		graph.setNumberOfExecutionRetries(program.getOriginalPactPlan().getNumberOfExecutionRetries());
 		graph.setAllowQueuedScheduling(false);
-		
+
 		// add vertices to the graph
 		for (JobVertex vertex : this.vertices.values()) {
 			graph.addVertex(vertex);
 		}
-		
+
 		for (JobVertex vertex : this.auxVertices) {
 			graph.addVertex(vertex);
 			vertex.setSlotSharingGroup(sharingGroup);
 		}
-		
+
 		// add registered cache file into job configuration
 		for (Entry<String, DistributedCacheEntry> e : program.getOriginalPactPlan().getCachedFiles()) {
 			DistributedCache.writeFileInfoToConfig(e.getKey(), e.getValue(), graph.getJobConfiguration());
@@ -222,9 +232,6 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 			throw new RuntimeException("Config object could not be written to Job Configuration: " + e);
 		}
 
-		String jsonPlan = new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(program);
-		graph.setJsonPlan(jsonPlan);
-
 		// release all references again
 		this.vertices = null;
 		this.chainedTasks = null;
@@ -507,10 +514,10 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 					
 					// update name of container task
 					String containerTaskName = container.getName();
-					if(containerTaskName.startsWith("CHAIN ")) {
-						container.setName(containerTaskName+" -> "+chainedTask.getTaskName());
+					if (containerTaskName.startsWith("CHAIN ")) {
+						container.setName(containerTaskName + " -> " + chainedTask.getTaskName());
 					} else {
-						container.setName("CHAIN "+containerTaskName+" -> "+chainedTask.getTaskName());
+						container.setName("CHAIN " + containerTaskName + " -> " + chainedTask.getTaskName());
 					}
 					
 					this.chainedTasksInSequence.add(chainedTask);
@@ -581,7 +588,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 			}
 		} catch (Exception e) {
 			throw new CompilerException(
-				"An error occurred while translating the optimized plan to a nephele JobGraph: " + e.getMessage(), e);
+				"An error occurred while translating the optimized plan to a JobGraph: " + e.getMessage(), e);
 		}
 	}
 	
@@ -612,7 +619,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 				
 				// check if the iteration's input is a union
 				if (iterationNode.getInput().getSource() instanceof NAryUnionPlanNode) {
-					allInChannels = ((NAryUnionPlanNode) iterationNode.getInput().getSource()).getInputs().iterator();
+					allInChannels = (iterationNode.getInput().getSource()).getInputs().iterator();
 				} else {
 					allInChannels = Collections.singletonList(iterationNode.getInput()).iterator();
 				}
@@ -631,7 +638,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 				
 				// check if the iteration's input is a union
 				if (iterationNode.getInput2().getSource() instanceof NAryUnionPlanNode) {
-					allInChannels = ((NAryUnionPlanNode) iterationNode.getInput2().getSource()).getInputs().iterator();
+					allInChannels = (iterationNode.getInput2().getSource()).getInputs().iterator();
 				} else {
 					allInChannels = Collections.singletonList(iterationNode.getInput2()).iterator();
 				}
@@ -760,7 +767,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		final DriverStrategy ds = node.getDriverStrategy();
 		
 		// check, whether chaining is possible
-		boolean chaining = false;
+		boolean chaining;
 		{
 			Channel inConn = node.getInput();
 			PlanNode pred = inConn.getSource();
@@ -804,7 +811,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		if (chaining) {
 			vertex = null;
 			config = new TaskConfig(new Configuration());
-			this.chainedTasks.put(node, new TaskInChain(ds.getPushChainDriverClass(), config, taskName));
+			this.chainedTasks.put(node, new TaskInChain(node, ds.getPushChainDriverClass(), config, taskName));
 		} else {
 			// create task vertex
 			vertex = new JobVertex(taskName);
@@ -820,7 +827,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		
 		// set the driver strategy
 		config.setDriverStrategy(ds);
-		for(int i=0;i<ds.getNumRequiredComparators();i++) {
+		for (int i = 0; i < ds.getNumRequiredComparators(); i++) {
 			config.setDriverComparator(node.getComparator(i), i);
 		}
 		// assign memory, file-handles, etc.
@@ -922,11 +929,11 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		final TaskConfig headConfig;
 		if (merge) {
 			final PlanNode successor = pspn.getOutgoingChannels().get(0).getTarget();
-			headVertex = (JobVertex) this.vertices.get(successor);
+			headVertex = this.vertices.get(successor);
 			
 			if (headVertex == null) {
 				throw new CompilerException(
-					"Bug: Trying to merge solution set with its sucessor, but successor has not been created.");
+					"Bug: Trying to merge solution set with its successor, but successor has not been created.");
 			}
 			
 			// reset the vertex type to iteration head
@@ -990,7 +997,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		final TaskConfig headConfig;
 		if (merge) {
 			final PlanNode successor = wspn.getOutgoingChannels().get(0).getTarget();
-			headVertex = (JobVertex) this.vertices.get(successor);
+			headVertex = this.vertices.get(successor);
 			
 			if (headVertex == null) {
 				throw new CompilerException(
@@ -1050,13 +1057,6 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 	 * channel is then the channel into the union node, the local strategy channel the one from the union to the
 	 * actual target operator.
 	 *
-	 * @param channel
-	 * @param inputNumber
-	 * @param sourceVertex
-	 * @param sourceConfig
-	 * @param targetVertex
-	 * @param targetConfig
-	 * @param isBroadcast
 	 * @throws CompilerException
 	 */
 	private DistributionPattern connectJobVertices(Channel channel, int inputNumber,
@@ -1109,7 +1109,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 
 		}
 
-		targetVertex.connectNewDataSetAsInput(sourceVertex, distributionPattern, resultType);
+		JobEdge edge = targetVertex.connectNewDataSetAsInput(sourceVertex, distributionPattern, resultType);
 
 		// -------------- configure the source task's ship strategy strategies in task config --------------
 		final int outputIndex = sourceConfig.getNumOutputs();
@@ -1146,6 +1146,35 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		} else {
 			targetConfig.addInputToGroup(inputNumber);
 		}
+		
+		// ---------------- attach the additional infos to the job edge -------------------
+		
+		String shipStrategy = JsonMapper.getShipStrategyString(channel.getShipStrategy());
+		if (channel.getShipStrategyKeys() != null && channel.getShipStrategyKeys().size() > 0) {
+			shipStrategy += " on " + (channel.getShipStrategySortOrder() == null ?
+					channel.getShipStrategyKeys().toString() :
+					Utils.createOrdering(channel.getShipStrategyKeys(), channel.getShipStrategySortOrder()).toString());
+		}
+		
+		String localStrategy;
+		if (channel.getLocalStrategy() == null || channel.getLocalStrategy() == LocalStrategy.NONE) {
+			localStrategy = null;
+		}
+		else {
+			localStrategy = JsonMapper.getLocalStrategyString(channel.getLocalStrategy());
+			if (localStrategy != null && channel.getLocalStrategyKeys() != null && channel.getLocalStrategyKeys().size() > 0) {
+				localStrategy += " on " + (channel.getLocalStrategySortOrder() == null ?
+						channel.getLocalStrategyKeys().toString() :
+						Utils.createOrdering(channel.getLocalStrategyKeys(), channel.getLocalStrategySortOrder()).toString());
+			}
+		}
+		
+		String caching = channel.getTempMode() == TempMode.NONE ? null : channel.getTempMode().toString();
+
+		edge.setShipStrategyName(shipStrategy);
+		edge.setPreProcessingOperationName(localStrategy);
+		edge.setOperatorLevelCachingDescription(caching);
+		
 		return distributionPattern;
 	}
 	
@@ -1191,7 +1220,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 			
 			if (needsMemory) {
 				// sanity check
-				if (tm == null || tm == TempMode.NONE || channel.getRelativeTempMemory() <= 0) {
+				if (tm == TempMode.NONE || channel.getRelativeTempMemory() <= 0) {
 					throw new CompilerException("Bug in compiler: Inconsistent description of input materialization.");
 				}
 				config.setRelativeInputMaterializationMemory(inputNum, channel.getRelativeTempMemory());
@@ -1247,14 +1276,14 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		final PlanNode rootOfStepFunction = bulkNode.getRootOfStepFunction();
 		final TaskConfig tailConfig;
 		
-		JobVertex rootOfStepFunctionVertex = (JobVertex) this.vertices.get(rootOfStepFunction);
+		JobVertex rootOfStepFunctionVertex = this.vertices.get(rootOfStepFunction);
 		if (rootOfStepFunctionVertex == null) {
 			// last op is chained
 			final TaskInChain taskInChain = this.chainedTasks.get(rootOfStepFunction);
 			if (taskInChain == null) {
 				throw new CompilerException("Bug: Tail of step function not found as vertex or chained task.");
 			}
-			rootOfStepFunctionVertex = (JobVertex) taskInChain.getContainingVertex();
+			rootOfStepFunctionVertex = taskInChain.getContainingVertex();
 
 			// the fake channel is statically typed to pact record. no data is sent over this channel anyways.
 			tailConfig = taskInChain.getTaskConfig();
@@ -1277,7 +1306,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		final TaskConfig tailConfigOfTerminationCriterion;
 		// If we have a termination criterion and it is not an intermediate node
 		if(rootOfTerminationCriterion != null && rootOfTerminationCriterion.getOutgoingChannels().isEmpty()) {
-			JobVertex rootOfTerminationCriterionVertex = (JobVertex) this.vertices.get(rootOfTerminationCriterion);
+			JobVertex rootOfTerminationCriterionVertex = this.vertices.get(rootOfTerminationCriterion);
 			
 			
 			if (rootOfTerminationCriterionVertex == null) {
@@ -1286,7 +1315,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 				if (taskInChain == null) {
 					throw new CompilerException("Bug: Tail of termination criterion not found as vertex or chained task.");
 				}
-				rootOfTerminationCriterionVertex = (JobVertex) taskInChain.getContainingVertex();
+				rootOfTerminationCriterionVertex = taskInChain.getContainingVertex();
 
 				// the fake channel is statically typed to pact record. no data is sent over this channel anyways.
 				tailConfigOfTerminationCriterion = taskInChain.getTaskConfig();
@@ -1396,14 +1425,14 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 			{
 				// get the vertex for the workset update
 				final TaskConfig worksetTailConfig;
-				JobVertex nextWorksetVertex = (JobVertex) this.vertices.get(nextWorksetNode);
+				JobVertex nextWorksetVertex = this.vertices.get(nextWorksetNode);
 				if (nextWorksetVertex == null) {
 					// nextWorksetVertex is chained
 					TaskInChain taskInChain = this.chainedTasks.get(nextWorksetNode);
 					if (taskInChain == null) {
 						throw new CompilerException("Bug: Next workset node not found as vertex or chained task.");
 					}
-					nextWorksetVertex = (JobVertex) taskInChain.getContainingVertex();
+					nextWorksetVertex = taskInChain.getContainingVertex();
 					worksetTailConfig = taskInChain.getTaskConfig();
 				} else {
 					worksetTailConfig = new TaskConfig(nextWorksetVertex.getConfiguration());
@@ -1421,14 +1450,14 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 			}
 			{
 				final TaskConfig solutionDeltaConfig;
-				JobVertex solutionDeltaVertex = (JobVertex) this.vertices.get(solutionDeltaNode);
+				JobVertex solutionDeltaVertex = this.vertices.get(solutionDeltaNode);
 				if (solutionDeltaVertex == null) {
 					// last op is chained
 					TaskInChain taskInChain = this.chainedTasks.get(solutionDeltaNode);
 					if (taskInChain == null) {
 						throw new CompilerException("Bug: Solution Set Delta not found as vertex or chained task.");
 					}
-					solutionDeltaVertex = (JobVertex) taskInChain.getContainingVertex();
+					solutionDeltaVertex = taskInChain.getContainingVertex();
 					solutionDeltaConfig = taskInChain.getTaskConfig();
 				} else {
 					solutionDeltaConfig = new TaskConfig(solutionDeltaVertex.getConfiguration());
@@ -1481,7 +1510,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		syncConfig.setConvergenceCriterion(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME, new WorksetEmptyConvergenceCriterion());
 	}
 	
-	private static String getDescriptionForUserCode(UserCodeWrapper<?> wrapper) {
+	private String getDescriptionForUserCode(UserCodeWrapper<?> wrapper) {
 		try {
 			if (wrapper.hasObject()) {
 				try {
@@ -1499,6 +1528,113 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 			return null;
 		}
 	}
+	
+	private void attachOperatorNamesAndDescriptions() {
+		JsonFactory jsonFactory = new JsonFactory();
+
+		// we go back to front
+
+		// start with the in chains 
+		for (int i = chainedTasksInSequence.size() - 1; i >= 0; i--) {
+			TaskInChain next = chainedTasksInSequence.get(i);
+			PlanNode planNode = next.getPlanNode();
+
+			JobVertex vertex = next.getContainingVertex();
+
+			// operator
+			String opName = planNode.getOptimizerNode().getOperatorName();
+			if (vertex.getOperatorName() == null) {
+				vertex.setOperatorName(opName);
+			}
+			else {
+				vertex.setOperatorName(opName + " -> " + vertex.getOperatorName());
+			}
+
+			// operator description 
+			String opDescription = JsonMapper.getOperatorStrategyString(planNode.getDriverStrategy());
+			if (vertex.getOperatorDescription() == null) {
+				vertex.setOperatorDescription(opDescription);
+			}
+			else {
+				vertex.setOperatorDescription(opDescription + "\n -> " + vertex.getOperatorDescription());
+			}
+
+			// pretty name
+			String prettyName = StringUtils.showControlCharacters(planNode.getNodeName());
+			if (vertex.getOperatorPrettyName() == null) {
+				vertex.setOperatorPrettyName(prettyName);
+			}
+			else {
+				vertex.setOperatorPrettyName(prettyName + "\n -> " + vertex.getOperatorPrettyName());
+			}
+
+			// optimizer output properties
+			if (vertex.getResultOptimizerProperties() == null) {
+				// since we go backwards, this must be the last in its chain
+				String outputProps =
+						JsonMapper.getOptimizerPropertiesJson(jsonFactory, planNode);
+				vertex.setResultOptimizerProperties(outputProps);
+			}
+		}
+
+		// finish back-to-front traversal by going over the head vertices
+		for (Map.Entry<PlanNode, JobVertex> entry : vertices.entrySet()) {
+			PlanNode node = entry.getKey();
+			JobVertex vertex = entry.getValue();
+
+			// get the predecessors
+
+			String input1name = null;
+			String input2name = null;
+			int num = 0;
+			for (Channel c : node.getInputs()) {
+				if (num == 0) {
+					input1name = c.getSource().getNodeName();
+				}
+				else if (num == 1) {
+					input2name = c.getSource().getNodeName();
+				}
+				num++;
+			}
+
+			// operator
+			String opName = node.getOptimizerNode().getOperatorName();
+			if (vertex.getOperatorName() == null) {
+				vertex.setOperatorName(opName);
+			}
+			else {
+				vertex.setOperatorName(opName + " -> " + vertex.getOperatorName());
+			}
+
+			// operator description
+			String opStrategy = JsonMapper.getOperatorStrategyString(
+					node.getDriverStrategy(),
+					input1name != null ? input1name : "(unnamed)",
+					input2name != null ? input2name : "(unnamed)");
+
+			if (vertex.getOperatorDescription() == null) {
+				vertex.setOperatorDescription(opStrategy);
+			}
+			else {
+				vertex.setOperatorDescription(opStrategy + "\n -> " + vertex.getOperatorDescription());
+			}
+
+			// pretty name
+			String prettyName = StringUtils.showControlCharacters(node.getNodeName());
+			if (vertex.getOperatorPrettyName() == null) {
+				vertex.setOperatorPrettyName(prettyName);
+			}
+			else {
+				vertex.setOperatorPrettyName(prettyName + "\n -> " + vertex.getOperatorPrettyName());
+			}
+
+			// if there is not yet an output from a chained task, we set this output
+			if (vertex.getResultOptimizerProperties() == null) {
+				vertex.setResultOptimizerProperties(
+						JsonMapper.getOptimizerPropertiesJson(jsonFactory, node));
+			}
+		}
+	}
 
 	// -------------------------------------------------------------------------------------
 	// Descriptors for tasks / configurations that are chained or merged with other tasks
@@ -1516,15 +1652,24 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		
 		private final String taskName;
 		
+		private final PlanNode planNode;
+		
 		private JobVertex containingVertex;
 
-		TaskInChain(Class<? extends ChainedDriver<?, ?>> chainedTask, TaskConfig taskConfig,
-					String taskName) {
+		TaskInChain(PlanNode planNode, Class<? extends ChainedDriver<?, ?>> chainedTask,
+					TaskConfig taskConfig, String taskName) {
+			
+			this.planNode = planNode;
 			this.chainedTask = chainedTask;
 			this.taskConfig = taskConfig;
 			this.taskName = taskName;
 		}
-		
+
+
+		public PlanNode getPlanNode() {
+			return planNode;
+		}
+
 		public Class<? extends ChainedDriver<?, ?>> getChainedTask() {
 			return this.chainedTask;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java
new file mode 100644
index 0000000..4d81058
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JsonMapper.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plantranslate;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.api.common.operators.CompilerHints;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+
+import java.io.IOException;
+import java.io.StringWriter;
+
+import static org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator.formatNumber;
+
+public class JsonMapper {
+
+	public static String getOperatorStrategyString(DriverStrategy strategy) {
+		return getOperatorStrategyString(strategy, "input 1", "input 2");
+	}
+	
+	public static String getOperatorStrategyString(DriverStrategy strategy, String firstInputName, String secondInputName) {
+		if (strategy == null) {
+			return "(null)";
+		}
+		switch (strategy) {
+			case SOURCE:
+				return "Data Source";
+			case SINK:
+				return "Data Sink";
+			
+			case NONE:
+				return "(none)";
+				
+			case BINARY_NO_OP:
+			case UNARY_NO_OP:
+				return "No-Op";
+
+			case COLLECTOR_MAP:
+			case MAP:
+				return "Map";
+
+			case FLAT_MAP:
+				return "FlatMap";
+
+			case MAP_PARTITION:
+				return "Map Partition";
+
+			case ALL_REDUCE:
+				return "Reduce All";
+
+			case ALL_GROUP_REDUCE:
+			case ALL_GROUP_REDUCE_COMBINE:
+				return "Group Reduce All";
+
+			case SORTED_REDUCE:
+				return "Sorted Reduce";
+
+			case SORTED_PARTIAL_REDUCE:
+				return "Sorted Combine/Reduce";
+
+			case SORTED_GROUP_REDUCE:
+				return "Sorted Group Reduce";
+
+			case SORTED_GROUP_COMBINE:
+				return "Sorted Combine";
+
+			case HYBRIDHASH_BUILD_FIRST:
+				return "Hybrid Hash (build: " + firstInputName + ")";
+				
+			case HYBRIDHASH_BUILD_SECOND:
+				return "Hybrid Hash (build: " + secondInputName + ")";
+
+			case HYBRIDHASH_BUILD_FIRST_CACHED:
+				return "Hybrid Hash (CACHED) (build: " + firstInputName + ")";
+
+			case HYBRIDHASH_BUILD_SECOND_CACHED:
+				return "Hybrid Hash (CACHED) (build: " + secondInputName + ")";
+
+			case NESTEDLOOP_BLOCKED_OUTER_FIRST:
+				return "Nested Loops (Blocked Outer: " + firstInputName + ")";
+			case NESTEDLOOP_BLOCKED_OUTER_SECOND:
+				return "Nested Loops (Blocked Outer: " + secondInputName + ")";
+			case NESTEDLOOP_STREAMED_OUTER_FIRST:
+				return "Nested Loops (Streamed Outer: " + firstInputName + ")";
+			case NESTEDLOOP_STREAMED_OUTER_SECOND:
+				return "Nested Loops (Streamed Outer: " + secondInputName + ")";
+
+			case MERGE:
+				return "Merge";
+
+			case CO_GROUP:
+				return "Co-Group";
+
+			default:
+				return strategy.name();
+		}
+	}
+	
+	public static String getShipStrategyString(ShipStrategyType shipType) {
+		if (shipType == null) {
+			return "(null)";
+		}
+		switch (shipType) {
+			case NONE:
+				return "(none)";
+			case FORWARD:
+				return "Forward";
+			case BROADCAST:
+				return "Broadcast";
+			case PARTITION_HASH:
+				return "Hash Partition";
+			case PARTITION_RANGE:
+				return "Range Partition";
+			case PARTITION_RANDOM:
+				return "Redistribute";
+			case PARTITION_FORCED_REBALANCE:
+				return "Rebalance";
+			case PARTITION_CUSTOM:
+				return "Custom Partition";
+			default:
+				return shipType.name();
+		}
+	}
+
+	public static String getLocalStrategyString(LocalStrategy localStrategy) {
+		if (localStrategy == null) {
+			return "(null)";
+		}
+		switch (localStrategy) {
+			case NONE:
+				return "(none)";
+			case SORT:
+				return "Sort";
+			case COMBININGSORT:
+				return "Sort (combining)";
+			default:
+				return localStrategy.name();
+		}
+	}
+	
+	public static String getOptimizerPropertiesJson(JsonFactory jsonFactory, PlanNode node) {
+		try {
+			final StringWriter writer = new StringWriter(256);
+			final JsonGenerator gen = jsonFactory.createGenerator(writer);
+			
+			final OptimizerNode optNode = node.getOptimizerNode();
+			
+			gen.writeStartObject();
+			
+			// global properties
+			if (node.getGlobalProperties() != null) {
+				GlobalProperties gp = node.getGlobalProperties();
+				gen.writeArrayFieldStart("global_properties");
+				
+				addProperty(gen, "Partitioning", gp.getPartitioning().name());
+				if (gp.getPartitioningFields() != null) {
+					addProperty(gen, "Partitioned on", gp.getPartitioningFields().toString());
+				}
+				if (gp.getPartitioningOrdering() != null) {
+					addProperty(gen, "Partitioning Order", gp.getPartitioningOrdering().toString());
+				}
+				else {
+					addProperty(gen, "Partitioning Order", "(none)");
+				}
+				if (optNode.getUniqueFields() == null || optNode.getUniqueFields().size() == 0) {
+					addProperty(gen, "Uniqueness", "not unique");
+				}
+				else {
+					addProperty(gen, "Uniqueness", optNode.getUniqueFields().toString());
+				}
+				
+				gen.writeEndArray();
+			}
+			
+			// local properties
+			if (node.getLocalProperties() != null) {
+				LocalProperties lp = node.getLocalProperties();
+				gen.writeArrayFieldStart("local_properties");
+				
+				if (lp.getOrdering() != null) {
+					addProperty(gen, "Order", lp.getOrdering().toString());
+				}
+				else {
+					addProperty(gen, "Order", "(none)");
+				}
+				if (lp.getGroupedFields() != null && lp.getGroupedFields().size() > 0) {
+					addProperty(gen, "Grouped on", lp.getGroupedFields().toString());
+				} else {
+					addProperty(gen, "Grouping", "not grouped");
+				}
+				if (optNode.getUniqueFields() == null || optNode.getUniqueFields().size() == 0) {
+					addProperty(gen, "Uniqueness", "not unique");
+				}
+				else {
+					addProperty(gen, "Uniqueness", optNode.getUniqueFields().toString());
+				}
+				
+				gen.writeEndArray();
+			}
+
+			// output size estimates
+			{
+				gen.writeArrayFieldStart("estimates");
+				
+				addProperty(gen, "Est. Output Size", optNode.getEstimatedOutputSize() == -1 ? "(unknown)"
+						: formatNumber(optNode.getEstimatedOutputSize(), "B"));
+				
+				addProperty(gen, "Est. Cardinality", optNode.getEstimatedNumRecords() == -1 ? "(unknown)"
+						: formatNumber(optNode.getEstimatedNumRecords()));
+				gen.writeEndArray();
+			}
+			
+			// output node cost
+			if (node.getNodeCosts() != null) {
+				gen.writeArrayFieldStart("costs");
+
+				addProperty(gen, "Network", node.getNodeCosts().getNetworkCost() == -1 ? 
+						"(unknown)" : formatNumber(node.getNodeCosts().getNetworkCost(), "B"));
+				addProperty(gen, "Disk I/O", node.getNodeCosts().getDiskCost() == -1 ? 
+						"(unknown)" : formatNumber(node.getNodeCosts().getDiskCost(), "B"));
+				addProperty(gen, "CPU", node.getNodeCosts().getCpuCost() == -1 ?
+						"(unknown)" : formatNumber(node.getNodeCosts().getCpuCost(), ""));
+
+				addProperty(gen, "Cumulative Network", node.getCumulativeCosts().getNetworkCost() == -1 ?
+						"(unknown)" : formatNumber(node.getCumulativeCosts().getNetworkCost(), "B"));
+				addProperty(gen, "Cumulative Disk I/O", node.getCumulativeCosts().getDiskCost() == -1 ?
+						"(unknown)" : formatNumber(node.getCumulativeCosts().getDiskCost(), "B"));
+				addProperty(gen, "Cumulative CPU", node.getCumulativeCosts().getCpuCost() == -1 ?
+						"(unknown)" : formatNumber(node.getCumulativeCosts().getCpuCost(), ""));
+				
+				gen.writeEndArray();
+			}
+
+
+			// compiler hints
+			if (optNode.getOperator().getCompilerHints() != null) {
+				CompilerHints hints = optNode.getOperator().getCompilerHints();
+				CompilerHints defaults = new CompilerHints();
+				
+				String size = hints.getOutputSize() == defaults.getOutputSize() ? 
+						"(none)" : String.valueOf(hints.getOutputSize());
+				String card = hints.getOutputCardinality() == defaults.getOutputCardinality() ? 
+						"(none)" : String.valueOf(hints.getOutputCardinality());
+				String width = hints.getAvgOutputRecordSize() == defaults.getAvgOutputRecordSize() ? 
+						"(none)" : String.valueOf(hints.getAvgOutputRecordSize());
+				String filter = hints.getFilterFactor() == defaults.getFilterFactor() ? 
+						"(none)" : String.valueOf(hints.getFilterFactor());
+				
+				gen.writeArrayFieldStart("compiler_hints");
+				
+				addProperty(gen, "Output Size (bytes)", size);
+				addProperty(gen, "Output Cardinality", card);
+				addProperty(gen, "Avg. Output Record Size (bytes)", width);
+				addProperty(gen, "Filter Factor", filter);
+				
+				gen.writeEndArray();
+			}
+			
+			
+			gen.writeEndObject();
+			
+			gen.close();
+			return writer.toString();
+		}
+		catch (Exception e) {
+			return "{}";
+		}
+	}
+	
+	
+	private static void addProperty(JsonGenerator gen, String name, String value) throws IOException {
+		gen.writeStartObject();
+		gen.writeStringField("name", name);
+		gen.writeStringField("value", value);
+		gen.writeEndObject();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.jade
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.jade
index 839f91f..82e61db 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.jade
@@ -18,18 +18,18 @@
 .panel.panel-default.panel-multi(ng-if="node")
   .panel-heading.clearfix
     .panel-title
-      | {{ node.pact }}
+      | {{ node.operator }}
 
     .panel-info.first
       | ID: {{ node.id }}
 
-    .panel-info(ng-if="node.contents")
+    .panel-info(ng-if="node.description")
       .label-group
         bs-label(status="{{status}}" ng-repeat="(index, status) in stateList") {{node.vertex.groupvertex[status]}}
 
   .panel-heading.clearfix
-    .panel-info.first.last(ng-if="node.contents")
-      span {{ node.contents }}
+    .panel-info.first.last(ng-if="node.description")
+      span {{ node.description }}
 
   .panel-body
     table.table.table-hover.table-clickable
@@ -79,41 +79,34 @@
         thead
           tr
             th(colspan="2")
-              | Pact Properties
+              | Properties
 
         tbody
           tr
             td Operator
-            td(table-property value="node.driver_strategy")
+            td(table-property value="node.operator_strategy")
 
           tr
             td Parallelism
             td(table-property value="node.parallelism")
 
-          tr
-            td Subtasks-per-instance
-            td(table-property value="node.subtasks_per_instance")
-
 
   .hidden-sm.col-md-4
     table.table.table-properties
       thead
         tr
           th(colspan="2")
-            | Pact Properties
+            | Properties
 
       tbody
         tr
           td Operator
-          td(table-property value="node.driver_strategy")
+          td(table-property value="node.operator_strategy")
 
         tr
           td Parallelism
           td(table-property value="node.parallelism")
 
-        tr
-          td Subtasks-per-instance
-          td(table-property value="node.subtasks_per_instance")
 
     table.table.table-properties(ng-if="node.estimates")
       thead

http://git-wip-us.apache.org/repos/asf/flink/blob/ff28981f/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee
index bb4d925..ee309ad 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee
@@ -235,11 +235,6 @@ angular.module('flinkApp')
         'node-iteration'
 
       else
-        if el.pact is "Data Source"
-          'node-source'
-        else if el.pact is "Data Sink"
-          'node-sink'
-        else
           'node-normal'
       
     # creates the label of a node, in info is stored, whether it is a special node (like a mirror in an iteration)
@@ -248,13 +243,13 @@ angular.module('flinkApp')
 
       # Nodename
       if info is "mirror"
-        labelValue += "<h3 class='node-name'>Mirror of " + el.pact + "</h3>"
+        labelValue += "<h3 class='node-name'>Mirror of " + el.operator + "</h3>"
       else
-        labelValue += "<h3 class='node-name'>" + el.pact + "</h3>"
-      if el.contents is ""
+        labelValue += "<h3 class='node-name'>" + el.operator + "</h3>"
+      if el.description is ""
         labelValue += ""
       else
-        stepName = el.contents
+        stepName = el.description
         
         # clean stepName
         stepName = shortenString(stepName)
@@ -268,7 +263,7 @@ angular.module('flinkApp')
         # Otherwise add infos    
         labelValue += "<h5>" + info + " Node</h5>"  if isSpecialIterationNode(info)
         labelValue += "<h5>Parallelism: " + el.parallelism + "</h5>"  unless el.parallelism is ""
-        labelValue += "<h5>Driver Strategy: " + shortenString(el.driver_strategy) + "</h5"  unless el.driver_strategy is `undefined`
+        labelValue += "<h5>Operation: " + shortenString(el.operator_strategy) + "</h5>"  unless el.operator is `undefined`
       
       labelValue += "</a>"
       labelValue
@@ -398,9 +393,9 @@ angular.module('flinkApp')
 
         existingNodes.push el.id
         
-        # create edges from predecessors to current node
-        if el.predecessors?
-          for pred in el.predecessors
+        # create edges from inputs to current node
+        if el.inputs?
+          for pred in el.inputs
             createEdge(g, data, el, existingNodes, pred)
 
       g


Mime
View raw message