flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [51/53] [abbrv] flink git commit: [FLINK-441] [optimizer] Rename Pact* and Nephele* classes
Date Fri, 20 Mar 2015 10:07:30 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
index 85f1aa7..e4b35b7 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
@@ -38,7 +38,7 @@ import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.costs.CostEstimator;
 import org.apache.flink.optimizer.costs.Costs;
 import org.apache.flink.optimizer.dataproperties.GlobalProperties;
@@ -105,8 +105,8 @@ public class DataSourceNode extends OptimizerNode {
 	 * @return The contract.
 	 */
 	@Override
-	public GenericDataSourceBase<?, ?> getPactContract() {
-		return (GenericDataSourceBase<?, ?>) super.getPactContract();
+	public GenericDataSourceBase<?, ?> getOperator() {
+		return (GenericDataSourceBase<?, ?>) super.getOperator();
 	}
 
 	@Override
@@ -123,8 +123,8 @@ public class DataSourceNode extends OptimizerNode {
 	}
 
 	@Override
-	public List<PactConnection> getIncomingConnections() {
-		return Collections.<PactConnection>emptyList();
+	public List<DagConnection> getIncomingConnections() {
+		return Collections.<DagConnection>emptyList();
 	}
 
 	@Override
@@ -139,13 +139,13 @@ public class DataSourceNode extends OptimizerNode {
 			String inFormatDescription = "<unknown>";
 			
 			try {
-				format = getPactContract().getFormatWrapper().getUserCodeObject();
-				Configuration config = getPactContract().getParameters();
+				format = getOperator().getFormatWrapper().getUserCodeObject();
+				Configuration config = getOperator().getParameters();
 				format.configure(config);
 			}
 			catch (Throwable t) {
-				if (PactCompiler.LOG.isWarnEnabled()) {
-					PactCompiler.LOG.warn("Could not instantiate InputFormat to obtain statistics."
+				if (Optimizer.LOG.isWarnEnabled()) {
+					Optimizer.LOG.warn("Could not instantiate InputFormat to obtain statistics."
 						+ " Limited statistics will be available.", t);
 				}
 				return;
@@ -158,7 +158,7 @@ public class DataSourceNode extends OptimizerNode {
 			}
 			
 			// first of all, get the statistics from the cache
-			final String statisticsKey = getPactContract().getStatisticsKey();
+			final String statisticsKey = getOperator().getStatisticsKey();
 			final BaseStatistics cachedStatistics = statistics.getBaseStatistics(statisticsKey);
 			
 			BaseStatistics bs = null;
@@ -166,16 +166,16 @@ public class DataSourceNode extends OptimizerNode {
 				bs = format.getStatistics(cachedStatistics);
 			}
 			catch (Throwable t) {
-				if (PactCompiler.LOG.isWarnEnabled()) {
-					PactCompiler.LOG.warn("Error obtaining statistics from input format: " + t.getMessage(), t);
+				if (Optimizer.LOG.isWarnEnabled()) {
+					Optimizer.LOG.warn("Error obtaining statistics from input format: " + t.getMessage(), t);
 				}
 			}
 			
 			if (bs != null) {
 				final long len = bs.getTotalInputSize();
 				if (len == BaseStatistics.SIZE_UNKNOWN) {
-					if (PactCompiler.LOG.isInfoEnabled()) {
-						PactCompiler.LOG.info("Compiler could not determine the size of input '" + inFormatDescription + "'. Using default estimates.");
+					if (Optimizer.LOG.isInfoEnabled()) {
+						Optimizer.LOG.info("Compiler could not determine the size of input '" + inFormatDescription + "'. Using default estimates.");
 					}
 				}
 				else if (len >= 0) {
@@ -207,14 +207,14 @@ public class DataSourceNode extends OptimizerNode {
 			return this.cachedPlans;
 		}
 
-		SourcePlanNode candidate = new SourcePlanNode(this, "DataSource ("+this.getPactContract().getName()+")",
+		SourcePlanNode candidate = new SourcePlanNode(this, "DataSource ("+this.getOperator().getName()+")",
 				this.gprops, this.lprops);
 
 		if(!replicatedInput) {
 			candidate.updatePropertiesWithUniqueSets(getUniqueFields());
 
 			final Costs costs = new Costs();
-			if (FileInputFormat.class.isAssignableFrom(getPactContract().getFormatWrapper().getUserCodeClass()) &&
+			if (FileInputFormat.class.isAssignableFrom(getOperator().getFormatWrapper().getUserCodeClass()) &&
 					this.estimatedOutputSize >= 0) {
 				estimator.addFileInputCost(this.estimatedOutputSize, costs);
 			}
@@ -223,10 +223,10 @@ public class DataSourceNode extends OptimizerNode {
 			// replicated input
 			final Costs costs = new Costs();
 			InputFormat<?,?> inputFormat =
-					((ReplicatingInputFormat<?,?>)getPactContract().getFormatWrapper().getUserCodeObject()).getReplicatedInputFormat();
+					((ReplicatingInputFormat<?,?>) getOperator().getFormatWrapper().getUserCodeObject()).getReplicatedInputFormat();
 			if (FileInputFormat.class.isAssignableFrom(inputFormat.getClass()) &&
 					this.estimatedOutputSize >= 0) {
-				estimator.addFileInputCost(this.estimatedOutputSize * this.getDegreeOfParallelism(), costs);
+				estimator.addFileInputCost(this.estimatedOutputSize * this.getParallelism(), costs);
 			}
 			candidate.setCosts(costs);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java
index 5370678..482951b 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java
@@ -16,10 +16,12 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.optimizer.dag;
 
-
+/**
+ * Methods for operators / connections that provide estimated about data size and
+ * characteristics.
+ */
 public interface EstimateProvider {
 	
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java
index c8dda12..118ddc8 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java
@@ -41,8 +41,8 @@ public class FilterNode extends SingleInputNode {
 	}
 
 	@Override
-	public FilterOperatorBase<?, ?> getPactContract() {
-		return (FilterOperatorBase<?, ?>) super.getPactContract();
+	public FilterOperatorBase<?, ?> getOperator() {
+		return (FilterOperatorBase<?, ?>) super.getOperator();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java
index bb5799c..f713d56 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java
@@ -40,8 +40,8 @@ public class FlatMapNode extends SingleInputNode {
 	}
 
 	@Override
-	public FlatMapOperatorBase<?, ?, ?> getPactContract() {
-		return (FlatMapOperatorBase<?, ?, ?>) super.getPactContract();
+	public FlatMapOperatorBase<?, ?, ?> getOperator() {
+		return (FlatMapOperatorBase<?, ?, ?>) super.getOperator();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
index 2e2576d..564c0d3 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
@@ -54,7 +54,7 @@ public class GroupCombineNode extends SingleInputNode {
 	private List<OperatorDescriptorSingle> initPossibleProperties() {
 
 		// check if we can work with a grouping (simple reducer), or if we need ordering because of a group order
-		Ordering groupOrder = getPactContract().getGroupOrder();
+		Ordering groupOrder = getOperator().getGroupOrder();
 		if (groupOrder != null && groupOrder.getNumberOfFields() == 0) {
 			groupOrder = null;
 		}
@@ -74,8 +74,8 @@ public class GroupCombineNode extends SingleInputNode {
 	 * @return The operator represented by this optimizer node.
 	 */
 	@Override
-	public GroupCombineOperatorBase<?, ?, ?> getPactContract() {
-		return (GroupCombineOperatorBase<?, ?, ?>) super.getPactContract();
+	public GroupCombineOperatorBase<?, ?, ?> getOperator() {
+		return (GroupCombineOperatorBase<?, ?, ?>) super.getOperator();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
index 88ec53a..77acae5 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.operators.AllGroupReduceProperties;
 import org.apache.flink.optimizer.operators.AllGroupWithPartialPreGroupProperties;
 import org.apache.flink.optimizer.operators.GroupReduceProperties;
@@ -67,17 +67,17 @@ public class GroupReduceNode extends SingleInputNode {
 	
 	private List<OperatorDescriptorSingle> initPossibleProperties(Partitioner<?> customPartitioner) {
 		// see if an internal hint dictates the strategy to use
-		final Configuration conf = getPactContract().getParameters();
-		final String localStrategy = conf.getString(PactCompiler.HINT_LOCAL_STRATEGY, null);
+		final Configuration conf = getOperator().getParameters();
+		final String localStrategy = conf.getString(Optimizer.HINT_LOCAL_STRATEGY, null);
 
 		final boolean useCombiner;
 		if (localStrategy != null) {
-			if (PactCompiler.HINT_LOCAL_STRATEGY_SORT.equals(localStrategy)) {
+			if (Optimizer.HINT_LOCAL_STRATEGY_SORT.equals(localStrategy)) {
 				useCombiner = false;
 			}
-			else if (PactCompiler.HINT_LOCAL_STRATEGY_COMBINING_SORT.equals(localStrategy)) {
+			else if (Optimizer.HINT_LOCAL_STRATEGY_COMBINING_SORT.equals(localStrategy)) {
 				if (!isCombineable()) {
-					PactCompiler.LOG.warn("Strategy hint for GroupReduce '" + getPactContract().getName() + 
+					Optimizer.LOG.warn("Strategy hint for GroupReduce '" + getOperator().getName() +
 						"' requires combinable reduce, but user function is not marked combinable.");
 				}
 				useCombiner = true;
@@ -90,8 +90,8 @@ public class GroupReduceNode extends SingleInputNode {
 		
 		// check if we can work with a grouping (simple reducer), or if we need ordering because of a group order
 		Ordering groupOrder = null;
-		if (getPactContract() instanceof GroupReduceOperatorBase) {
-			groupOrder = ((GroupReduceOperatorBase<?, ?, ?>) getPactContract()).getGroupOrder();
+		if (getOperator() instanceof GroupReduceOperatorBase) {
+			groupOrder = ((GroupReduceOperatorBase<?, ?, ?>) getOperator()).getGroupOrder();
 			if (groupOrder != null && groupOrder.getNumberOfFields() == 0) {
 				groupOrder = null;
 			}
@@ -112,8 +112,8 @@ public class GroupReduceNode extends SingleInputNode {
 	 * @return The operator represented by this optimizer node.
 	 */
 	@Override
-	public GroupReduceOperatorBase<?, ?, ?> getPactContract() {
-		return (GroupReduceOperatorBase<?, ?, ?>) super.getPactContract();
+	public GroupReduceOperatorBase<?, ?, ?> getOperator() {
+		return (GroupReduceOperatorBase<?, ?, ?>) super.getOperator();
 	}
 
 	/**
@@ -123,7 +123,7 @@ public class GroupReduceNode extends SingleInputNode {
 	 * @return True, if a combiner has been given, false otherwise.
 	 */
 	public boolean isCombineable() {
-		return getPactContract().isCombinable();
+		return getOperator().isCombinable();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
index 3c67108..cbd58ca 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/JoinNode.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.operators.AbstractJoinDescriptor;
 import org.apache.flink.optimizer.operators.HashJoinBuildFirstProperties;
 import org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties;
@@ -62,8 +62,8 @@ public class JoinNode extends TwoInputNode {
 	 * @return The contract.
 	 */
 	@Override
-	public JoinOperatorBase<?, ?, ?, ?> getPactContract() {
-		return (JoinOperatorBase<?, ?, ?, ?>) super.getPactContract();
+	public JoinOperatorBase<?, ?, ?, ?> getOperator() {
+		return (JoinOperatorBase<?, ?, ?, ?>) super.getOperator();
 	}
 
 	@Override
@@ -116,21 +116,21 @@ public class JoinNode extends TwoInputNode {
 	{
 		// see if an internal hint dictates the strategy to use
 		Configuration conf = joinOperatorBase.getParameters();
-		String localStrategy = conf.getString(PactCompiler.HINT_LOCAL_STRATEGY, null);
+		String localStrategy = conf.getString(Optimizer.HINT_LOCAL_STRATEGY, null);
 
 		if (localStrategy != null) {
 			final AbstractJoinDescriptor fixedDriverStrat;
-			if (PactCompiler.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE.equals(localStrategy) ||
-				PactCompiler.HINT_LOCAL_STRATEGY_SORT_FIRST_MERGE.equals(localStrategy) ||
-				PactCompiler.HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE.equals(localStrategy) ||
-				PactCompiler.HINT_LOCAL_STRATEGY_MERGE.equals(localStrategy) )
+			if (Optimizer.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE.equals(localStrategy) ||
+				Optimizer.HINT_LOCAL_STRATEGY_SORT_FIRST_MERGE.equals(localStrategy) ||
+				Optimizer.HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE.equals(localStrategy) ||
+				Optimizer.HINT_LOCAL_STRATEGY_MERGE.equals(localStrategy) )
 			{
 				fixedDriverStrat = new SortMergeJoinDescriptor(this.keys1, this.keys2);
 			}
-			else if (PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST.equals(localStrategy)) {
+			else if (Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST.equals(localStrategy)) {
 				fixedDriverStrat = new HashJoinBuildFirstProperties(this.keys1, this.keys2);
 			}
-			else if (PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND.equals(localStrategy)) {
+			else if (Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND.equals(localStrategy)) {
 				fixedDriverStrat = new HashJoinBuildSecondProperties(this.keys1, this.keys2);
 			}
 			else {

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java
index cbcab2b..de3cd22 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/MatchNode.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.operators.base.JoinOperatorBase;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.operators.HashJoinBuildFirstProperties;
 import org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties;
 import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
@@ -58,8 +58,8 @@ public class MatchNode extends TwoInputNode {
 	 * @return The contract.
 	 */
 	@Override
-	public JoinOperatorBase<?, ?, ?, ?> getPactContract() {
-		return (JoinOperatorBase<?, ?, ?, ?>) super.getPactContract();
+	public JoinOperatorBase<?, ?, ?, ?> getOperator() {
+		return (JoinOperatorBase<?, ?, ?, ?>) super.getOperator();
 	}
 
 	@Override
@@ -110,19 +110,19 @@ public class MatchNode extends TwoInputNode {
 	private List<OperatorDescriptorDual> getDataProperties(JoinOperatorBase<?, ?, ?, ?> joinOperatorBase, JoinHint joinHint) {
 		// see if an internal hint dictates the strategy to use
 		Configuration conf = joinOperatorBase.getParameters();
-		String localStrategy = conf.getString(PactCompiler.HINT_LOCAL_STRATEGY, null);
+		String localStrategy = conf.getString(Optimizer.HINT_LOCAL_STRATEGY, null);
 
 		if (localStrategy != null) {
 			final OperatorDescriptorDual fixedDriverStrat;
-			if (PactCompiler.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE.equals(localStrategy) ||
-				PactCompiler.HINT_LOCAL_STRATEGY_SORT_FIRST_MERGE.equals(localStrategy) ||
-				PactCompiler.HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE.equals(localStrategy) ||
-				PactCompiler.HINT_LOCAL_STRATEGY_MERGE.equals(localStrategy) )
+			if (Optimizer.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE.equals(localStrategy) ||
+				Optimizer.HINT_LOCAL_STRATEGY_SORT_FIRST_MERGE.equals(localStrategy) ||
+				Optimizer.HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE.equals(localStrategy) ||
+				Optimizer.HINT_LOCAL_STRATEGY_MERGE.equals(localStrategy) )
 			{
 				fixedDriverStrat = new SortMergeJoinDescriptor(this.keys1, this.keys2);
-			} else if (PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST.equals(localStrategy)) {
+			} else if (Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST.equals(localStrategy)) {
 				fixedDriverStrat = new HashJoinBuildFirstProperties(this.keys1, this.keys2);
-			} else if (PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND.equals(localStrategy)) {
+			} else if (Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND.equals(localStrategy)) {
 				fixedDriverStrat = new HashJoinBuildSecondProperties(this.keys1, this.keys2);
 			} else {
 				throw new CompilerException("Invalid local strategy hint for match contract: " + localStrategy);

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
index baeb1f7..0cad34e 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.optimizer.dag;
 
 import java.util.ArrayList;
@@ -48,8 +47,19 @@ import org.apache.flink.util.Visitable;
 import org.apache.flink.util.Visitor;
 
 /**
- * This class represents a node in the optimizer's internal representation of the PACT plan. It contains
- * extra information about estimates, hints and data properties.
+ * The OptimizerNode is the base class of all nodes in the optimizer DAG. The optimizer DAG is the
+ * optimizer's representation of a program, created before the actual optimization (which creates different
+ * candidate plans and computes their cost).
+ * <p>>
+ * Nodes in the DAG correspond (almost) one-to-one to the operators in a program. The optimizer DAG is constructed
+ * to hold the additional information that the optimizer needs:
+ * <ul>
+ *     <li>Estimates of the data size processed by each operator</li>
+ *     <li>Helper structures to track where the data flow "splits" and "joins", to support flows that are
+ *         DAGs but not trees.</li>
+ *     <li>Tags and weights to differentiate between loop-variant and -invariant parts of an iteration</li>
+ *     <li>Interesting properties to be used during the enumeration of candidate plans</li>
+ * </ul>
  */
 public abstract class OptimizerNode implements Visitable<OptimizerNode>, EstimateProvider, DumpableNode<OptimizerNode> {
 	
@@ -59,13 +69,13 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 	//                                      Members
 	// --------------------------------------------------------------------------------------------
 
-	private final Operator<?> pactContract; // The operator (Reduce / Join / DataSource / ...)
+	private final Operator<?> operator; // The operator (Reduce / Join / DataSource / ...)
 	
 	private List<String> broadcastConnectionNames = new ArrayList<String>(); // the broadcast inputs names of this node
 	
-	private List<PactConnection> broadcastConnections = new ArrayList<PactConnection>(); // the broadcast inputs of this node
+	private List<DagConnection> broadcastConnections = new ArrayList<DagConnection>(); // the broadcast inputs of this node
 	
-	private List<PactConnection> outgoingConnections; // The links to succeeding nodes
+	private List<DagConnection> outgoingConnections; // The links to succeeding nodes
 
 	private InterestingProperties intProps; // the interesting properties of this node
 	
@@ -88,7 +98,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 
 	// --------------------------------- General Parameters ---------------------------------------
 	
-	private int degreeOfParallelism = -1; // the number of parallel instances of this node
+	private int parallelism = -1; // the number of parallel instances of this node
 	
 	private long minimalMemoryPerSubTask = -1;
 
@@ -105,18 +115,17 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Creates a new node for the optimizer plan.
+	 * Creates a new optimizer node that represents the given program operator.
 	 * 
 	 * @param op The operator that the node represents.
 	 */
 	public OptimizerNode(Operator<?> op) {
-		this.pactContract = op;
+		this.operator = op;
 		readStubAnnotations();
 	}
 	
 	protected OptimizerNode(OptimizerNode toCopy) {
-		this.pactContract = toCopy.pactContract;
-		
+		this.operator = toCopy.operator;
 		this.intProps = toCopy.intProps;
 		
 		this.openBranches = toCopy.openBranches;
@@ -125,7 +134,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 		this.estimatedOutputSize = toCopy.estimatedOutputSize;
 		this.estimatedNumRecords = toCopy.estimatedNumRecords;
 		
-		this.degreeOfParallelism = toCopy.degreeOfParallelism;
+		this.parallelism = toCopy.parallelism;
 		this.minimalMemoryPerSubTask = toCopy.minimalMemoryPerSubTask;
 		
 		this.id = toCopy.id;
@@ -156,7 +165,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 									ExecutionMode defaultExchangeMode);
 
 	/**
-	 * This function connects the predecessors to this operator.
+	 * This function connects the operators that produce the broadcast inputs to this operator.
 	 *
 	 * @param operatorToNode The map from program operators to optimizer nodes.
 	 * @param defaultExchangeMode The data exchange mode to use, if the operator does not
@@ -164,21 +173,19 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 	 *
 	 * @throws CompilerException
 	 */
-	public void setBroadcastInputs(Map<Operator<?>, OptimizerNode> operatorToNode, ExecutionMode defaultExchangeMode)
-			throws CompilerException
-	{
+	public void setBroadcastInputs(Map<Operator<?>, OptimizerNode> operatorToNode, ExecutionMode defaultExchangeMode) {
 		// skip for Operators that don't support broadcast variables 
-		if (!(getPactContract() instanceof AbstractUdfOperator<?, ?>)) {
+		if (!(getOperator() instanceof AbstractUdfOperator<?, ?>)) {
 			return;
 		}
 
 		// get all broadcast inputs
-		AbstractUdfOperator<?, ?> operator = ((AbstractUdfOperator<?, ?>) getPactContract());
+		AbstractUdfOperator<?, ?> operator = ((AbstractUdfOperator<?, ?>) getOperator());
 
 		// create connections and add them
 		for (Map.Entry<String, Operator<?>> input : operator.getBroadcastInputs().entrySet()) {
 			OptimizerNode predecessor = operatorToNode.get(input.getValue());
-			PactConnection connection = new PactConnection(predecessor, this,
+			DagConnection connection = new DagConnection(predecessor, this,
 															ShipStrategyType.BROADCAST, defaultExchangeMode);
 			addBroadcastConnection(input.getKey(), connection);
 			predecessor.addOutgoingConnection(connection);
@@ -186,11 +193,12 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 	}
 
 	/**
+	 * Gets all incoming connections of this node.
 	 * This method needs to be overridden by subclasses to return the children.
 	 * 
 	 * @return The list of incoming connections.
 	 */
-	public abstract List<PactConnection> getIncomingConnections();
+	public abstract List<DagConnection> getIncomingConnections();
 
 	/**
 	 * Tells the node to compute the interesting properties for its inputs. The interesting properties
@@ -215,7 +223,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 															List<UnclosedBranchDescriptor> branchesSoFar)
 	{
 		// handle the data flow branching for the broadcast inputs
-		for (PactConnection broadcastInput : getBroadcastConnections()) {
+		for (DagConnection broadcastInput : getBroadcastConnections()) {
 			OptimizerNode bcSource = broadcastInput.getSource();
 			addClosedBranches(bcSource.closedBranchingNodes);
 			
@@ -263,11 +271,11 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 	public Iterable<OptimizerNode> getPredecessors() {
 		List<OptimizerNode> allPredecessors = new ArrayList<OptimizerNode>();
 
-		for (PactConnection pactConnection : getIncomingConnections()) {
-			allPredecessors.add(pactConnection.getSource());
+		for (DagConnection dagConnection : getIncomingConnections()) {
+			allPredecessors.add(dagConnection.getSource());
 		}
 		
-		for (PactConnection conn : getBroadcastConnections()) {
+		for (DagConnection conn : getBroadcastConnections()) {
 			allPredecessors.add(conn.getSource());
 		}
 		
@@ -306,7 +314,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 	 * 
 	 * @param broadcastConnection The connection to add.
 	 */
-	public void addBroadcastConnection(String name, PactConnection broadcastConnection) {
+	public void addBroadcastConnection(String name, DagConnection broadcastConnection) {
 		this.broadcastConnectionNames.add(name);
 		this.broadcastConnections.add(broadcastConnection);
 	}
@@ -321,26 +329,26 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 	/**
 	 * Return the list of inputs associated with broadcast variables for this node.
 	 */
-	public List<PactConnection> getBroadcastConnections() {
+	public List<DagConnection> getBroadcastConnections() {
 		return this.broadcastConnections;
 	}
 
 	/**
 	 * Adds a new outgoing connection to this node.
 	 * 
-	 * @param pactConnection
+	 * @param connection
 	 *        The connection to add.
 	 */
-	public void addOutgoingConnection(PactConnection pactConnection) {
+	public void addOutgoingConnection(DagConnection connection) {
 		if (this.outgoingConnections == null) {
-			this.outgoingConnections = new ArrayList<PactConnection>();
+			this.outgoingConnections = new ArrayList<DagConnection>();
 		} else {
 			if (this.outgoingConnections.size() == 64) {
 				throw new CompilerException("Cannot currently handle nodes with more than 64 outputs.");
 			}
 		}
 
-		this.outgoingConnections.add(pactConnection);
+		this.outgoingConnections.add(connection);
 	}
 
 	/**
@@ -348,7 +356,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 	 * 
 	 * @return The list of outgoing connections.
 	 */
-	public List<PactConnection> getOutgoingConnections() {
+	public List<DagConnection> getOutgoingConnections() {
 		return this.outgoingConnections;
 	}
 
@@ -357,8 +365,8 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 	 * 
 	 * @return This node's operator.
 	 */
-	public Operator<?> getPactContract() {
-		return this.pactContract;
+	public Operator<?> getOperator() {
+		return this.operator;
 	}
 
 	/**
@@ -369,8 +377,8 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 	 * 
 	 * @return The parallelism of the operator.
 	 */
-	public int getDegreeOfParallelism() {
-		return this.degreeOfParallelism;
+	public int getParallelism() {
+		return this.parallelism;
 	}
 
 	/**
@@ -386,7 +394,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 		if (parallelism < 1 && parallelism != -1) {
 			throw new IllegalArgumentException("Degree of parallelism of " + parallelism + " is invalid.");
 		}
-		this.degreeOfParallelism = parallelism;
+		this.parallelism = parallelism;
 	}
 	
 	/**
@@ -395,7 +403,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 	 * @return The total amount of memory across all subtasks.
 	 */
 	public long getMinimalMemoryAcrossAllSubTasks() {
-		return this.minimalMemoryPerSubTask == -1 ? -1 : this.minimalMemoryPerSubTask * this.degreeOfParallelism;
+		return this.minimalMemoryPerSubTask == -1 ? -1 : this.minimalMemoryPerSubTask * this.parallelism;
 	}
 	
 	public boolean isOnDynamicPath() {
@@ -406,13 +414,13 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 		boolean anyDynamic = false;
 		boolean allDynamic = true;
 		
-		for (PactConnection conn : getIncomingConnections()) {
+		for (DagConnection conn : getIncomingConnections()) {
 			boolean dynamicIn = conn.isOnDynamicPath();
 			anyDynamic |= dynamicIn;
 			allDynamic &= dynamicIn;
 		}
 		
-		for (PactConnection conn : getBroadcastConnections()) {
+		for (DagConnection conn : getBroadcastConnections()) {
 			boolean dynamicIn = conn.isOnDynamicPath();
 			anyDynamic |= dynamicIn;
 			allDynamic &= dynamicIn;
@@ -424,7 +432,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 			if (!allDynamic) {
 				// this node joins static and dynamic path.
 				// mark the connections where the source is not dynamic as cached
-				for (PactConnection conn : getIncomingConnections()) {
+				for (DagConnection conn : getIncomingConnections()) {
 					if (!conn.getSource().isOnDynamicPath()) {
 						conn.setMaterializationMode(conn.getMaterializationMode().makeCached());
 					}
@@ -442,10 +450,10 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 	
 	public int getMaxDepth() {
 		int maxDepth = 0;
-		for (PactConnection conn : getIncomingConnections()) {
+		for (DagConnection conn : getIncomingConnections()) {
 			maxDepth = Math.max(maxDepth, conn.getMaxDepth());
 		}
-		for (PactConnection conn : getBroadcastConnections()) {
+		for (DagConnection conn : getBroadcastConnections()) {
 			maxDepth = Math.max(maxDepth, conn.getMaxDepth());
 		}
 		
@@ -502,7 +510,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 		if (this.outgoingConnections == null) {
 			throw new IllegalStateException("The outgoing connections have not yet been initialized.");
 		}
-		for (PactConnection conn : getOutgoingConnections()) {
+		for (DagConnection conn : getOutgoingConnections()) {
 			conn.markBreaksPipeline();
 		}
 	}
@@ -517,7 +525,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 	 * @return True, if on all outgoing connections, the interesting properties are set. False otherwise.
 	 */
 	public boolean haveAllOutputConnectionInterestingProperties() {
-		for (PactConnection conn : getOutgoingConnections()) {
+		for (DagConnection conn : getOutgoingConnections()) {
 			if (conn.getInterestingProperties() == null) {
 				return false;
 			}
@@ -535,7 +543,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 	 * leaves the original objects, contained by the connections, unchanged.
 	 */
 	public void computeUnionOfInterestingPropertiesFromSuccessors() {
-		List<PactConnection> conns = getOutgoingConnections();
+		List<DagConnection> conns = getOutgoingConnections();
 		if (conns.size() == 0) {
 			// no incoming, we have none ourselves
 			this.intProps = new InterestingProperties();
@@ -550,10 +558,10 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 	
 	public void clearInterestingProperties() {
 		this.intProps = null;
-		for (PactConnection conn : getIncomingConnections()) {
+		for (DagConnection conn : getIncomingConnections()) {
 			conn.clearInterestingProperties();
 		}
-		for (PactConnection conn : getBroadcastConnections()) {
+		for (DagConnection conn : getBroadcastConnections()) {
 			conn.clearInterestingProperties();
 		}
 	}
@@ -570,7 +578,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 	 */
 	public void computeOutputEstimates(DataStatistics statistics) {
 		// sanity checking
-		for (PactConnection c : getIncomingConnections()) {
+		for (DagConnection c : getIncomingConnections()) {
 			if (c.getSource() == null) {
 				throw new CompilerException("Bug: Estimate computation called before inputs have been set.");
 			}
@@ -587,11 +595,11 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 		}
 		
 		// overwrite default estimates with hints, if given
-		if (getPactContract() == null || getPactContract().getCompilerHints() == null) {
+		if (getOperator() == null || getOperator().getCompilerHints() == null) {
 			return ;
 		}
 		
-		CompilerHints hints = getPactContract().getCompilerHints();
+		CompilerHints hints = getOperator().getCompilerHints();
 		if (hints.getOutputSize() >= 0) {
 			this.estimatedOutputSize = hints.getOutputSize();
 		}
@@ -643,8 +651,8 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 	}
 	
 	protected void readUniqueFieldsAnnotation() {
-		if (this.pactContract.getCompilerHints() != null) {
-			Set<FieldSet> uniqueFieldSets = pactContract.getCompilerHints().getUniqueFields();
+		if (this.operator.getCompilerHints() != null) {
+			Set<FieldSet> uniqueFieldSets = operator.getCompilerHints().getUniqueFields();
 			if (uniqueFieldSets != null) {
 				if (this.uniqueFields == null) {
 					this.uniqueFields = new HashSet<FieldSet>();
@@ -686,7 +694,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 		
 		// for pruning, we are quasi AFTER the node, so in the presence of
 		// branches, we need form the per-branch-choice groups by the choice
-		// they made at the latest unjoined branching node. Note that this is
+		// they made at the latest un-joined branching node. Note that this is
 		// different from the check for branch compatibility of candidates, as
 		// this happens on the input sub-plans and hence BEFORE the node (therefore
 		// it is relevant to find the latest (partially) joined branch point.
@@ -708,12 +716,12 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 				
 				@Override
 				public int compare(PlanNode o1, PlanNode o2) {
-					for (int i = 0; i < branchDeterminers.length; i++) {
-						PlanNode n1 = o1.getCandidateAtBranchPoint(branchDeterminers[i]);
-						PlanNode n2 = o2.getCandidateAtBranchPoint(branchDeterminers[i]);
+					for (OptimizerNode branchDeterminer : branchDeterminers) {
+						PlanNode n1 = o1.getCandidateAtBranchPoint(branchDeterminer);
+						PlanNode n2 = o2.getCandidateAtBranchPoint(branchDeterminer);
 						int hash1 = System.identityHashCode(n1);
 						int hash2 = System.identityHashCode(n2);
-						
+
 						if (hash1 != hash2) {
 							return hash1 - hash2;
 						}
@@ -775,8 +783,10 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 	
 	protected void prunePlanAlternativesWithCommonBranching(List<PlanNode> plans) {
 		// for each interesting property, which plans are cheapest
-		final RequestedGlobalProperties[] gps = (RequestedGlobalProperties[]) this.intProps.getGlobalProperties().toArray(new RequestedGlobalProperties[this.intProps.getGlobalProperties().size()]);
-		final RequestedLocalProperties[] lps = (RequestedLocalProperties[]) this.intProps.getLocalProperties().toArray(new RequestedLocalProperties[this.intProps.getLocalProperties().size()]);
+		final RequestedGlobalProperties[] gps = this.intProps.getGlobalProperties().toArray(
+							new RequestedGlobalProperties[this.intProps.getGlobalProperties().size()]);
+		final RequestedLocalProperties[] lps = this.intProps.getLocalProperties().toArray(
+							new RequestedLocalProperties[this.intProps.getLocalProperties().size()]);
 		
 		final PlanNode[][] toKeep = new PlanNode[gps.length][];
 		final PlanNode[] cheapestForGlobal = new PlanNode[gps.length];
@@ -831,14 +841,12 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 			plans.add(cheapest);
 			cheapest.setPruningMarker(); // remember that that plan is in the set
 		}
-		
-		// skip the top down delta cost check for now (TODO: implement this)
+
 		// add all others, which are optimal for some interesting properties
 		for (int i = 0; i < gps.length; i++) {
 			if (toKeep[i] != null) {
 				final PlanNode[] localMatches = toKeep[i];
-				for (int k = 0; k < localMatches.length; k++) {
-					final PlanNode n = localMatches[k];
+				for (final PlanNode n : localMatches) {
 					if (n != null && !n.isPruneMarkerSet()) {
 						n.setPruningMarker();
 						plans.add(n);
@@ -873,7 +881,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 	}
 
 
-	protected List<UnclosedBranchDescriptor> getBranchesForParent(PactConnection toParent) {
+	protected List<UnclosedBranchDescriptor> getBranchesForParent(DagConnection toParent) {
 		if (this.outgoingConnections.size() == 1) {
 			// return our own stack of open branches, because nothing is added
 			if (this.openBranches == null || this.openBranches.isEmpty()) {
@@ -954,8 +962,8 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 	 * a) There is no branch in the sub-plan of this node
 	 * b) Both candidates have the same candidate as the child at the last open branch. 
 	 * 
-	 * @param plan1
-	 * @param plan2
+	 * @param plan1 The root node of the first candidate plan.
+	 * @param plan2 The root node of the second candidate plan.
 	 * @return True if the nodes are branch compatible in the inputs.
 	 */
 	protected boolean areBranchCompatible(PlanNode plan1, PlanNode plan2) {
@@ -1086,39 +1094,6 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 		Collections.reverse(result);
 		return didCloseABranch;
 	}
-	
-	/**
-	 *
-	 */
-	public static final class UnclosedBranchDescriptor
-	{
-		protected OptimizerNode branchingNode;
-
-		protected long joinedPathsVector;
-
-		/**
-		 * @param branchingNode
-		 * @param joinedPathsVector
-		 */
-		protected UnclosedBranchDescriptor(OptimizerNode branchingNode, long joinedPathsVector)
-		{
-			this.branchingNode = branchingNode;
-			this.joinedPathsVector = joinedPathsVector;
-		}
-
-		public OptimizerNode getBranchingNode() {
-			return this.branchingNode;
-		}
-
-		public long getJoinedPathsVector() {
-			return this.joinedPathsVector;
-		}
-		
-		@Override
-		public String toString() {
-			return "(" + this.branchingNode.getPactContract() + ") [" + this.joinedPathsVector + "]";
-		}
-	}
 
 	@Override
 	public OptimizerNode getOptimizerNode() {
@@ -1145,14 +1120,53 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
 		StringBuilder bld = new StringBuilder();
 
 		bld.append(getName());
-		bld.append(" (").append(getPactContract().getName()).append(") ");
+		bld.append(" (").append(getOperator().getName()).append(") ");
 
 		int i = 1; 
-		for (PactConnection conn : getIncomingConnections()) {
+		for (DagConnection conn : getIncomingConnections()) {
 			String shipStrategyName = conn.getShipStrategy() == null ? "null" : conn.getShipStrategy().name();
 			bld.append('(').append(i++).append(":").append(shipStrategyName).append(')');
 		}
 
 		return bld.toString();
 	}
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Description of an unclosed branch. An unclosed branch is when the data flow branched (one operator's
+	 * result is consumed by multiple targets), but these different branches (targets) have not been joined
+	 * together.
+	 */
+	public static final class UnclosedBranchDescriptor {
+
+		protected OptimizerNode branchingNode;
+
+		protected long joinedPathsVector;
+
+		/**
+		 * Creates a new branching descriptor.
+		 *
+		 * @param branchingNode The node where the branch occurred (teh node with multiple outputs).
+		 * @param joinedPathsVector A bit vector describing which branches are tracked by this descriptor.
+		 *                          The bit vector is one, where the branch is tracked, zero otherwise.
+		 */
+		protected UnclosedBranchDescriptor(OptimizerNode branchingNode, long joinedPathsVector) {
+			this.branchingNode = branchingNode;
+			this.joinedPathsVector = joinedPathsVector;
+		}
+
+		public OptimizerNode getBranchingNode() {
+			return this.branchingNode;
+		}
+
+		public long getJoinedPathsVector() {
+			return this.joinedPathsVector;
+		}
+
+		@Override
+		public String toString() {
+			return "(" + this.branchingNode.getOperator() + ") [" + this.joinedPathsVector + "]";
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PactConnection.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PactConnection.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PactConnection.java
deleted file mode 100644
index 661ceb5..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PactConnection.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.optimizer.dataproperties.InterestingProperties;
-import org.apache.flink.optimizer.plandump.DumpableConnection;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-
-/**
- * A connection between to operators. Represents an intermediate result
- * and a data exchange between the two operators.
- *
- * The data exchange has a mode in which it performs (batch / pipelined)
- *
- * The data exchange strategy may be set on this connection, in which case
- * it is fixed and will not be determined during candidate plan enumeration.
- *
- * During the enumeration of interesting properties, this connection also holds
- * all interesting properties generated by the successor operator.
- */
-public class PactConnection implements EstimateProvider, DumpableConnection<OptimizerNode> {
-	
-	private final OptimizerNode source; // The source node of the connection
-
-	private final OptimizerNode target; // The target node of the connection.
-
-	private final ExecutionMode dataExchangeMode; // defines whether to use batch or pipelined data exchange
-
-	private InterestingProperties interestingProps; // local properties that succeeding nodes are interested in
-
-	private ShipStrategyType shipStrategy; // The data shipping strategy, if predefined.
-	
-	private TempMode materializationMode = TempMode.NONE; // the materialization mode
-	
-	private int maxDepth = -1;
-
-	private boolean breakPipeline;  // whether this connection should break the pipeline due to potential deadlocks
-
-	/**
-	 * Creates a new Connection between two nodes. The shipping strategy is by default <tt>NONE</tt>.
-	 * The temp mode is by default <tt>NONE</tt>.
-	 * 
-	 * @param source
-	 *        The source node.
-	 * @param target
-	 *        The target node.
-	 */
-	public PactConnection(OptimizerNode source, OptimizerNode target, ExecutionMode exchangeMode) {
-		this(source, target, null, exchangeMode);
-	}
-
-	/**
-	 * Creates a new Connection between two nodes.
-	 * 
-	 * @param source
-	 *        The source node.
-	 * @param target
-	 *        The target node.
-	 * @param shipStrategy
-	 *        The shipping strategy.
-	 * @param exchangeMode
-	 *        The data exchange mode (pipelined / batch / batch only for shuffles / ... )
-	 */
-	public PactConnection(OptimizerNode source, OptimizerNode target,
-							ShipStrategyType shipStrategy, ExecutionMode exchangeMode)
-	{
-		if (source == null || target == null) {
-			throw new NullPointerException("Source and target must not be null.");
-		}
-		this.source = source;
-		this.target = target;
-		this.shipStrategy = shipStrategy;
-		this.dataExchangeMode = exchangeMode;
-	}
-	
-	/**
-	 * Constructor to create a result from an operator that is not
-	 * consumed by another operator.
-	 * 
-	 * @param source
-	 *        The source node.
-	 */
-	public PactConnection(OptimizerNode source, ExecutionMode exchangeMode) {
-		if (source == null) {
-			throw new NullPointerException("Source and target must not be null.");
-		}
-		this.source = source;
-		this.target = null;
-		this.shipStrategy = ShipStrategyType.NONE;
-		this.dataExchangeMode = exchangeMode;
-	}
-
-	/**
-	 * Gets the source of the connection.
-	 * 
-	 * @return The source Node.
-	 */
-	public OptimizerNode getSource() {
-		return this.source;
-	}
-
-	/**
-	 * Gets the target of the connection.
-	 * 
-	 * @return The target node.
-	 */
-	public OptimizerNode getTarget() {
-		return this.target;
-	}
-
-	/**
-	 * Gets the shipping strategy for this connection.
-	 * 
-	 * @return The connection's shipping strategy.
-	 */
-	public ShipStrategyType getShipStrategy() {
-		return this.shipStrategy;
-	}
-
-	/**
-	 * Sets the shipping strategy for this connection.
-	 * 
-	 * @param strategy
-	 *        The shipping strategy to be applied to this connection.
-	 */
-	public void setShipStrategy(ShipStrategyType strategy) {
-		this.shipStrategy = strategy;
-	}
-
-	/**
-	 * Gets the data exchange mode to use for this connection.
-	 *
-	 * @return The data exchange mode to use for this connection.
-	 */
-	public ExecutionMode getDataExchangeMode() {
-		if (dataExchangeMode == null) {
-			throw new IllegalStateException("This connection does not have the data exchange mode set");
-		}
-		return dataExchangeMode;
-	}
-
-	/**
-	 * Marks that this connection should do a decoupled data exchange (such as batched)
-	 * rather then pipeline data. Connections are marked as pipeline breakers to avoid
-	 * deadlock situations.
-	 */
-	public void markBreaksPipeline() {
-		this.breakPipeline = true;
-	}
-
-	/**
-	 * Checks whether this connection is marked to break the pipeline.
-	 *
-	 * @return True, if this connection is marked to break the pipeline, false otherwise.
-	 */
-	public boolean isBreakingPipeline() {
-		return this.breakPipeline;
-	}
-
-	/**
-	 * Gets the interesting properties object for this pact connection.
-	 * If the interesting properties for this connections have not yet been set,
-	 * this method returns null.
-	 * 
-	 * @return The collection of all interesting properties, or null, if they have not yet been set.
-	 */
-	public InterestingProperties getInterestingProperties() {
-		return this.interestingProps;
-	}
-
-	/**
-	 * Sets the interesting properties for this pact connection.
-	 * 
-	 * @param props The interesting properties.
-	 */
-	public void setInterestingProperties(InterestingProperties props) {
-		if (this.interestingProps == null) {
-			this.interestingProps = props;
-		} else {
-			throw new IllegalStateException("Interesting Properties have already been set.");
-		}
-	}
-	
-	public void clearInterestingProperties() {
-		this.interestingProps = null;
-	}
-	
-	public void initMaxDepth() {
-		
-		if (this.maxDepth == -1) {
-			this.maxDepth = this.source.getMaxDepth() + 1;
-		} else {
-			throw new IllegalStateException("Maximum path depth has already been initialized.");
-		}
-	}
-	
-	public int getMaxDepth() {
-		if (this.maxDepth != -1) {
-			return this.maxDepth;
-		} else {
-			throw new IllegalStateException("Maximum path depth has not been initialized.");
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Estimates
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public long getEstimatedOutputSize() {
-		return this.source.getEstimatedOutputSize();
-	}
-
-	@Override
-	public long getEstimatedNumRecords() {
-		return this.source.getEstimatedNumRecords();
-	}
-	
-	@Override
-	public float getEstimatedAvgWidthPerOutputRecord() {
-		return this.source.getEstimatedAvgWidthPerOutputRecord();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-
-	
-	public TempMode getMaterializationMode() {
-		return this.materializationMode;
-	}
-	
-	public void setMaterializationMode(TempMode materializationMode) {
-		this.materializationMode = materializationMode;
-	}
-	
-	public boolean isOnDynamicPath() {
-		return this.source.isOnDynamicPath();
-	}
-	
-	public int getCostWeight() {
-		return this.source.getCostWeight();
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	public String toString() {
-		StringBuilder buf = new StringBuilder(50);
-		buf.append("Connection: ");
-
-		if (this.source == null) {
-			buf.append("null");
-		} else {
-			buf.append(this.source.getPactContract().getName());
-			buf.append('(').append(this.source.getName()).append(')');
-		}
-
-		buf.append(" -> ");
-
-		if (this.shipStrategy != null) {
-			buf.append('[');
-			buf.append(this.shipStrategy.name());
-			buf.append(']').append(' ');
-		}
-
-		if (this.target == null) {
-			buf.append("null");
-		} else {
-			buf.append(this.target.getPactContract().getName());
-			buf.append('(').append(this.target.getName()).append(')');
-		}
-
-		return buf.toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java
index 00b54ac..5c811b0 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/PartitionNode.java
@@ -49,13 +49,13 @@ public class PartitionNode extends SingleInputNode {
 		super(operator);
 		
 		OperatorDescriptorSingle descr = new PartitionDescriptor(
-					this.getPactContract().getPartitionMethod(), this.keys, operator.getCustomPartitioner());
+					this.getOperator().getPartitionMethod(), this.keys, operator.getCustomPartitioner());
 		this.possibleProperties = Collections.singletonList(descr);
 	}
 
 	@Override
-	public PartitionOperatorBase<?> getPactContract() {
-		return (PartitionOperatorBase<?>) super.getPactContract();
+	public PartitionOperatorBase<?> getOperator() {
+		return (PartitionOperatorBase<?>) super.getOperator();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
index 6f4c43d..1477038 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
@@ -62,8 +62,8 @@ public class ReduceNode extends SingleInputNode {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public ReduceOperatorBase<?, ?> getPactContract() {
-		return (ReduceOperatorBase<?, ?>) super.getPactContract();
+	public ReduceOperatorBase<?, ?> getOperator() {
+		return (ReduceOperatorBase<?, ?>) super.getOperator();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
index 9217beb..cc12bb8 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SingleInputNode.java
@@ -36,7 +36,7 @@ import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.SingleInputOperator;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.costs.CostEstimator;
 import org.apache.flink.optimizer.dataproperties.GlobalProperties;
 import org.apache.flink.optimizer.dataproperties.InterestingProperties;
@@ -68,7 +68,7 @@ public abstract class SingleInputNode extends OptimizerNode {
 	
 	protected final FieldSet keys; 			// The set of key fields
 	
-	protected PactConnection inConn; 		// the input of the node
+	protected DagConnection inConn; 		// the input of the node
 	
 	// --------------------------------------------------------------------------------------------
 	
@@ -103,8 +103,8 @@ public abstract class SingleInputNode extends OptimizerNode {
 	// --------------------------------------------------------------------------------------------
 
 	@Override
-	public SingleInputOperator<?, ?, ?> getPactContract() {
-		return (SingleInputOperator<?, ?, ?>) super.getPactContract();
+	public SingleInputOperator<?, ?, ?> getOperator() {
+		return (SingleInputOperator<?, ?, ?>) super.getOperator();
 	}
 	
 	/**
@@ -112,7 +112,7 @@ public abstract class SingleInputNode extends OptimizerNode {
 	 * 
 	 * @return The input.
 	 */
-	public PactConnection getIncomingConnection() {
+	public DagConnection getIncomingConnection() {
 		return this.inConn;
 	}
 
@@ -121,7 +121,7 @@ public abstract class SingleInputNode extends OptimizerNode {
 	 * 
 	 * @param inConn The input connection to set.
 	 */
-	public void setIncomingConnection(PactConnection inConn) {
+	public void setIncomingConnection(DagConnection inConn) {
 		this.inConn = inConn;
 	}
 	
@@ -139,14 +139,14 @@ public abstract class SingleInputNode extends OptimizerNode {
 	}
 
 	@Override
-	public List<PactConnection> getIncomingConnections() {
+	public List<DagConnection> getIncomingConnections() {
 		return Collections.singletonList(this.inConn);
 	}
 	
 
 	@Override
 	public SemanticProperties getSemanticProperties() {
-		return getPactContract().getSemanticProperties();
+		return getOperator().getSemanticProperties();
 	}
 	
 
@@ -155,18 +155,18 @@ public abstract class SingleInputNode extends OptimizerNode {
 			throws CompilerException
 	{
 		// see if an internal hint dictates the strategy to use
-		final Configuration conf = getPactContract().getParameters();
-		final String shipStrategy = conf.getString(PactCompiler.HINT_SHIP_STRATEGY, null);
+		final Configuration conf = getOperator().getParameters();
+		final String shipStrategy = conf.getString(Optimizer.HINT_SHIP_STRATEGY, null);
 		final ShipStrategyType preSet;
 		
 		if (shipStrategy != null) {
-			if (shipStrategy.equalsIgnoreCase(PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH)) {
+			if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH)) {
 				preSet = ShipStrategyType.PARTITION_HASH;
-			} else if (shipStrategy.equalsIgnoreCase(PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_RANGE)) {
+			} else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE)) {
 				preSet = ShipStrategyType.PARTITION_RANGE;
-			} else if (shipStrategy.equalsIgnoreCase(PactCompiler.HINT_SHIP_STRATEGY_FORWARD)) {
+			} else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_FORWARD)) {
 				preSet = ShipStrategyType.FORWARD;
-			} else if (shipStrategy.equalsIgnoreCase(PactCompiler.HINT_SHIP_STRATEGY_REPARTITION)) {
+			} else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) {
 				preSet = ShipStrategyType.PARTITION_RANDOM;
 			} else {
 				throw new CompilerException("Unrecognized ship strategy hint: " + shipStrategy);
@@ -176,15 +176,15 @@ public abstract class SingleInputNode extends OptimizerNode {
 		}
 		
 		// get the predecessor node
-		Operator<?> children = ((SingleInputOperator<?, ?, ?>) getPactContract()).getInput();
+		Operator<?> children = ((SingleInputOperator<?, ?, ?>) getOperator()).getInput();
 		
 		OptimizerNode pred;
-		PactConnection conn;
+		DagConnection conn;
 		if (children == null) {
-			throw new CompilerException("Error: Node for '" + getPactContract().getName() + "' has no input.");
+			throw new CompilerException("Error: Node for '" + getOperator().getName() + "' has no input.");
 		} else {
 			pred = contractToNode.get(children);
-			conn = new PactConnection(pred, this, defaultExchangeMode);
+			conn = new DagConnection(pred, this, defaultExchangeMode);
 			if (preSet != null) {
 				conn.setShipStrategy(preSet);
 			}
@@ -230,7 +230,7 @@ public abstract class SingleInputNode extends OptimizerNode {
 		}
 		this.inConn.setInterestingProperties(props);
 		
-		for (PactConnection conn : getBroadcastConnections()) {
+		for (DagConnection conn : getBroadcastConnections()) {
 			conn.setInterestingProperties(new InterestingProperties());
 		}
 	}
@@ -251,11 +251,11 @@ public abstract class SingleInputNode extends OptimizerNode {
 		
 		// calculate alternative sub-plans for broadcast inputs
 		final List<Set<? extends NamedChannel>> broadcastPlanChannels = new ArrayList<Set<? extends NamedChannel>>();
-		List<PactConnection> broadcastConnections = getBroadcastConnections();
+		List<DagConnection> broadcastConnections = getBroadcastConnections();
 		List<String> broadcastConnectionNames = getBroadcastConnectionNames();
 
 		for (int i = 0; i < broadcastConnections.size(); i++ ) {
-			PactConnection broadcastConnection = broadcastConnections.get(i);
+			DagConnection broadcastConnection = broadcastConnections.get(i);
 			String broadcastConnectionName = broadcastConnectionNames.get(i);
 			List<PlanNode> broadcastPlanCandidates = broadcastConnection.getSource().getAlternativePlans(estimator);
 
@@ -283,8 +283,8 @@ public abstract class SingleInputNode extends OptimizerNode {
 
 		final ExecutionMode executionMode = this.inConn.getDataExchangeMode();
 
-		final int dop = getDegreeOfParallelism();
-		final int inDop = getPredecessorNode().getDegreeOfParallelism();
+		final int dop = getParallelism();
+		final int inDop = getPredecessorNode().getParallelism();
 		final boolean dopChange = inDop != dop;
 
 		final boolean breaksPipeline = this.inConn.isBreakingPipeline();
@@ -509,7 +509,7 @@ public abstract class SingleInputNode extends OptimizerNode {
 			} else {
 				throw new CompilerException();
 			}
-			for (PactConnection connection : getBroadcastConnections()) {
+			for (DagConnection connection : getBroadcastConnections()) {
 				connection.getSource().accept(visitor);
 			}
 			visitor.postVisit(this);

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
index 2d65b4d..40725ba 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SinkJoiner.java
@@ -31,8 +31,9 @@ import org.apache.flink.optimizer.util.NoOpBinaryUdfOp;
 import org.apache.flink.types.Nothing;
 
 /**
- * This class represents a utility node that is not part of the actual plan. It is used for plans with multiple data sinks to
- * transform it into a plan with a single root node. That way, the code that makes sure no costs are double-counted and that 
+ * This class represents a utility node that is not part of the actual plan.
+ * It is used for plans with multiple data sinks to transform it into a plan with
+ * a single root node. That way, the code that makes sure no costs are double-counted and that
  * candidate selection works correctly with nodes that have multiple outputs is transparently reused.
  */
 public class SinkJoiner extends TwoInputNode {
@@ -40,8 +41,8 @@ public class SinkJoiner extends TwoInputNode {
 	public SinkJoiner(OptimizerNode input1, OptimizerNode input2) {
 		super(new NoOpBinaryUdfOp<Nothing>(new NothingTypeInfo()));
 
-		PactConnection conn1 = new PactConnection(input1, this, null, ExecutionMode.PIPELINED);
-		PactConnection conn2 = new PactConnection(input2, this, null, ExecutionMode.PIPELINED);
+		DagConnection conn1 = new DagConnection(input1, this, null, ExecutionMode.PIPELINED);
+		DagConnection conn2 = new DagConnection(input2, this, null, ExecutionMode.PIPELINED);
 		
 		this.input1 = conn1;
 		this.input2 = conn2;
@@ -55,7 +56,7 @@ public class SinkJoiner extends TwoInputNode {
 	}
 	
 	@Override
-	public List<PactConnection> getOutgoingConnections() {
+	public List<DagConnection> getOutgoingConnections() {
 		return Collections.emptyList();
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java
index df47e56..1292cf5 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SolutionSetNode.java
@@ -46,7 +46,7 @@ public class SolutionSetNode extends AbstractPartialSolutionNode {
 	// --------------------------------------------------------------------------------------------
 	
 	public void setCandidateProperties(GlobalProperties gProps, LocalProperties lProps, Channel initialInput) {
-		this.cachedPlans = Collections.<PlanNode>singletonList(new SolutionSetPlanNode(this, "SolutionSet("+this.getPactContract().getName()+")", gProps, lProps, initialInput));
+		this.cachedPlans = Collections.<PlanNode>singletonList(new SolutionSetPlanNode(this, "SolutionSet("+this.getOperator().getName()+")", gProps, lProps, initialInput));
 	}
 	
 	public SolutionSetPlanNode getCurrentSolutionSetPlanNode() {
@@ -74,8 +74,8 @@ public class SolutionSetNode extends AbstractPartialSolutionNode {
 	 * @return The contract.
 	 */
 	@Override
-	public SolutionSetPlaceHolder<?> getPactContract() {
-		return (SolutionSetPlaceHolder<?>) super.getPactContract();
+	public SolutionSetPlaceHolder<?> getOperator() {
+		return (SolutionSetPlaceHolder<?>) super.getOperator();
 	}
 
 	@Override
@@ -89,7 +89,7 @@ public class SolutionSetNode extends AbstractPartialSolutionNode {
 			return;
 		}
 
-		PactConnection solutionSetInput = this.iterationNode.getFirstIncomingConnection();
+		DagConnection solutionSetInput = this.iterationNode.getFirstIncomingConnection();
 		OptimizerNode solutionSetSource = solutionSetInput.getSource();
 		
 		addClosedBranches(solutionSetSource.closedBranchingNodes);

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java
index 17bc8f1..83bc39a 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/SortPartitionNode.java
@@ -51,8 +51,8 @@ public class SortPartitionNode extends SingleInputNode {
 	}
 
 	@Override
-	public SortPartitionOperatorBase<?> getPactContract() {
-		return (SortPartitionOperatorBase<?>) super.getPactContract();
+	public SortPartitionOperatorBase<?> getOperator() {
+		return (SortPartitionOperatorBase<?>) super.getOperator();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
index d51f2de..39da165 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/TwoInputNode.java
@@ -36,7 +36,7 @@ import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.PactCompiler;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.costs.CostEstimator;
 import org.apache.flink.optimizer.dataproperties.GlobalProperties;
 import org.apache.flink.optimizer.dataproperties.InterestingProperties;
@@ -70,9 +70,9 @@ public abstract class TwoInputNode extends OptimizerNode {
 	
 	protected final FieldList keys2; // The set of key fields for the second input
 	
-	protected PactConnection input1; // The first input edge
+	protected DagConnection input1; // The first input edge
 
-	protected PactConnection input2; // The second input edge
+	protected DagConnection input2; // The second input edge
 	
 	private List<OperatorDescriptorDual> cachedDescriptors;
 	
@@ -109,8 +109,8 @@ public abstract class TwoInputNode extends OptimizerNode {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public DualInputOperator<?, ?, ?, ?> getPactContract() {
-		return (DualInputOperator<?, ?, ?, ?>) super.getPactContract();
+	public DualInputOperator<?, ?, ?, ?> getOperator() {
+		return (DualInputOperator<?, ?, ?, ?>) super.getOperator();
 	}
 
 	/**
@@ -118,7 +118,7 @@ public abstract class TwoInputNode extends OptimizerNode {
 	 * 
 	 * @return The first input connection.
 	 */
-	public PactConnection getFirstIncomingConnection() {
+	public DagConnection getFirstIncomingConnection() {
 		return this.input1;
 	}
 
@@ -127,7 +127,7 @@ public abstract class TwoInputNode extends OptimizerNode {
 	 * 
 	 * @return The second input connection.
 	 */
-	public PactConnection getSecondIncomingConnection() {
+	public DagConnection getSecondIncomingConnection() {
 		return this.input2;
 	}
 	
@@ -148,8 +148,8 @@ public abstract class TwoInputNode extends OptimizerNode {
 	}
 
 	@Override
-	public List<PactConnection> getIncomingConnections() {
-		ArrayList<PactConnection> inputs = new ArrayList<PactConnection>(2);
+	public List<DagConnection> getIncomingConnections() {
+		ArrayList<DagConnection> inputs = new ArrayList<DagConnection>(2);
 		inputs.add(input1);
 		inputs.add(input2);
 		return inputs;
@@ -159,21 +159,21 @@ public abstract class TwoInputNode extends OptimizerNode {
 	@Override
 	public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode defaultExecutionMode) {
 		// see if there is a hint that dictates which shipping strategy to use for BOTH inputs
-		final Configuration conf = getPactContract().getParameters();
+		final Configuration conf = getOperator().getParameters();
 		ShipStrategyType preSet1 = null;
 		ShipStrategyType preSet2 = null;
 		
-		String shipStrategy = conf.getString(PactCompiler.HINT_SHIP_STRATEGY, null);
+		String shipStrategy = conf.getString(Optimizer.HINT_SHIP_STRATEGY, null);
 		if (shipStrategy != null) {
-			if (PactCompiler.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) {
+			if (Optimizer.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) {
 				preSet1 = preSet2 = ShipStrategyType.FORWARD;
-			} else if (PactCompiler.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) {
+			} else if (Optimizer.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) {
 				preSet1 = preSet2 = ShipStrategyType.BROADCAST;
-			} else if (PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) {
+			} else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) {
 				preSet1 = preSet2 = ShipStrategyType.PARTITION_HASH;
-			} else if (PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) {
+			} else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) {
 				preSet1 = preSet2 = ShipStrategyType.PARTITION_RANGE;
-			} else if (shipStrategy.equalsIgnoreCase(PactCompiler.HINT_SHIP_STRATEGY_REPARTITION)) {
+			} else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) {
 				preSet1 = preSet2 = ShipStrategyType.PARTITION_RANDOM;
 			} else {
 				throw new CompilerException("Unknown hint for shipping strategy: " + shipStrategy);
@@ -181,17 +181,17 @@ public abstract class TwoInputNode extends OptimizerNode {
 		}
 
 		// see if there is a hint that dictates which shipping strategy to use for the FIRST input
-		shipStrategy = conf.getString(PactCompiler.HINT_SHIP_STRATEGY_FIRST_INPUT, null);
+		shipStrategy = conf.getString(Optimizer.HINT_SHIP_STRATEGY_FIRST_INPUT, null);
 		if (shipStrategy != null) {
-			if (PactCompiler.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) {
+			if (Optimizer.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) {
 				preSet1 = ShipStrategyType.FORWARD;
-			} else if (PactCompiler.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) {
+			} else if (Optimizer.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) {
 				preSet1 = ShipStrategyType.BROADCAST;
-			} else if (PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) {
+			} else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) {
 				preSet1 = ShipStrategyType.PARTITION_HASH;
-			} else if (PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) {
+			} else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) {
 				preSet1 = ShipStrategyType.PARTITION_RANGE;
-			} else if (shipStrategy.equalsIgnoreCase(PactCompiler.HINT_SHIP_STRATEGY_REPARTITION)) {
+			} else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) {
 				preSet1 = ShipStrategyType.PARTITION_RANDOM;
 			} else {
 				throw new CompilerException("Unknown hint for shipping strategy of input one: " + shipStrategy);
@@ -199,17 +199,17 @@ public abstract class TwoInputNode extends OptimizerNode {
 		}
 
 		// see if there is a hint that dictates which shipping strategy to use for the SECOND input
-		shipStrategy = conf.getString(PactCompiler.HINT_SHIP_STRATEGY_SECOND_INPUT, null);
+		shipStrategy = conf.getString(Optimizer.HINT_SHIP_STRATEGY_SECOND_INPUT, null);
 		if (shipStrategy != null) {
-			if (PactCompiler.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) {
+			if (Optimizer.HINT_SHIP_STRATEGY_FORWARD.equals(shipStrategy)) {
 				preSet2 = ShipStrategyType.FORWARD;
-			} else if (PactCompiler.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) {
+			} else if (Optimizer.HINT_SHIP_STRATEGY_BROADCAST.equals(shipStrategy)) {
 				preSet2 = ShipStrategyType.BROADCAST;
-			} else if (PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) {
+			} else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH.equals(shipStrategy)) {
 				preSet2 = ShipStrategyType.PARTITION_HASH;
-			} else if (PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) {
+			} else if (Optimizer.HINT_SHIP_STRATEGY_REPARTITION_RANGE.equals(shipStrategy)) {
 				preSet2 = ShipStrategyType.PARTITION_RANGE;
-			} else if (shipStrategy.equalsIgnoreCase(PactCompiler.HINT_SHIP_STRATEGY_REPARTITION)) {
+			} else if (shipStrategy.equalsIgnoreCase(Optimizer.HINT_SHIP_STRATEGY_REPARTITION)) {
 				preSet2 = ShipStrategyType.PARTITION_RANDOM;
 			} else {
 				throw new CompilerException("Unknown hint for shipping strategy of input two: " + shipStrategy);
@@ -217,18 +217,18 @@ public abstract class TwoInputNode extends OptimizerNode {
 		}
 		
 		// get the predecessors
-		DualInputOperator<?, ?, ?, ?> contr = getPactContract();
+		DualInputOperator<?, ?, ?, ?> contr = getOperator();
 		
 		Operator<?> leftPred = contr.getFirstInput();
 		Operator<?> rightPred = contr.getSecondInput();
 		
 		OptimizerNode pred1;
-		PactConnection conn1;
+		DagConnection conn1;
 		if (leftPred == null) {
-			throw new CompilerException("Error: Node for '" + getPactContract().getName() + "' has no input set for first input.");
+			throw new CompilerException("Error: Node for '" + getOperator().getName() + "' has no input set for first input.");
 		} else {
 			pred1 = contractToNode.get(leftPred);
-			conn1 = new PactConnection(pred1, this, defaultExecutionMode);
+			conn1 = new DagConnection(pred1, this, defaultExecutionMode);
 			if (preSet1 != null) {
 				conn1.setShipStrategy(preSet1);
 			}
@@ -239,12 +239,12 @@ public abstract class TwoInputNode extends OptimizerNode {
 		pred1.addOutgoingConnection(conn1);
 		
 		OptimizerNode pred2;
-		PactConnection conn2;
+		DagConnection conn2;
 		if (rightPred == null) {
-			throw new CompilerException("Error: Node for '" + getPactContract().getName() + "' has no input set for second input.");
+			throw new CompilerException("Error: Node for '" + getOperator().getName() + "' has no input set for second input.");
 		} else {
 			pred2 = contractToNode.get(rightPred);
-			conn2 = new PactConnection(pred2, this, defaultExecutionMode);
+			conn2 = new DagConnection(pred2, this, defaultExecutionMode);
 			if (preSet2 != null) {
 				conn2.setShipStrategy(preSet2);
 			}
@@ -290,7 +290,7 @@ public abstract class TwoInputNode extends OptimizerNode {
 		this.input1.setInterestingProperties(props1);
 		this.input2.setInterestingProperties(props2);
 		
-		for (PactConnection conn : getBroadcastConnections()) {
+		for (DagConnection conn : getBroadcastConnections()) {
 			conn.setInterestingProperties(new InterestingProperties());
 		}
 	}
@@ -314,11 +314,11 @@ public abstract class TwoInputNode extends OptimizerNode {
 		
 		// calculate alternative sub-plans for broadcast inputs
 		final List<Set<? extends NamedChannel>> broadcastPlanChannels = new ArrayList<Set<? extends NamedChannel>>();
-		List<PactConnection> broadcastConnections = getBroadcastConnections();
+		List<DagConnection> broadcastConnections = getBroadcastConnections();
 		List<String> broadcastConnectionNames = getBroadcastConnectionNames();
 
 		for (int i = 0; i < broadcastConnections.size(); i++ ) {
-			PactConnection broadcastConnection = broadcastConnections.get(i);
+			DagConnection broadcastConnection = broadcastConnections.get(i);
 			String broadcastConnectionName = broadcastConnectionNames.get(i);
 			List<PlanNode> broadcastPlanCandidates = broadcastConnection.getSource().getAlternativePlans(estimator);
 
@@ -352,9 +352,9 @@ public abstract class TwoInputNode extends OptimizerNode {
 		final ExecutionMode input1Mode = this.input1.getDataExchangeMode();
 		final ExecutionMode input2Mode = this.input2.getDataExchangeMode();
 
-		final int dop = getDegreeOfParallelism();
-		final int inDop1 = getFirstPredecessorNode().getDegreeOfParallelism();
-		final int inDop2 = getSecondPredecessorNode().getDegreeOfParallelism();
+		final int dop = getParallelism();
+		final int inDop1 = getFirstPredecessorNode().getParallelism();
+		final int inDop2 = getSecondPredecessorNode().getParallelism();
 
 		final boolean dopChange1 = dop != inDop1;
 		final boolean dopChange2 = dop != inDop2;
@@ -720,7 +720,7 @@ public abstract class TwoInputNode extends OptimizerNode {
 
 	@Override
 	public SemanticProperties getSemanticProperties() {
-		return getPactContract().getSemanticProperties();
+		return getOperator().getSemanticProperties();
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -737,7 +737,7 @@ public abstract class TwoInputNode extends OptimizerNode {
 			getFirstPredecessorNode().accept(visitor);
 			getSecondPredecessorNode().accept(visitor);
 			
-			for (PactConnection connection : getBroadcastConnections()) {
+			for (DagConnection connection : getBroadcastConnections()) {
 				connection.getSource().accept(visitor);
 			}
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
index 2ec36b1..71a49f3 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
@@ -32,7 +32,7 @@ import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.PactCompiler.InterestingPropertyVisitor;
+import org.apache.flink.optimizer.Optimizer.InterestingPropertyVisitor;
 import org.apache.flink.optimizer.costs.CostEstimator;
 import org.apache.flink.optimizer.dataproperties.GlobalProperties;
 import org.apache.flink.optimizer.dataproperties.InterestingProperties;
@@ -80,9 +80,9 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 	
 	private OptimizerNode nextWorkset;
 	
-	private PactConnection solutionSetDeltaRootConnection;
+	private DagConnection solutionSetDeltaRootConnection;
 	
-	private PactConnection nextWorksetRootConnection;
+	private DagConnection nextWorksetRootConnection;
 	
 	private SingleRootJoiner singleRoot;
 	
@@ -122,7 +122,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 	// --------------------------------------------------------------------------------------------
 	
 	public DeltaIterationBase<?, ?> getIterationContract() {
-		return (DeltaIterationBase<?, ?>) getPactContract();
+		return (DeltaIterationBase<?, ?>) getOperator();
 	}
 	
 	public SolutionSetNode getSolutionSetNode() {
@@ -167,9 +167,9 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 		// if the next workset is equal to the workset, we need to inject a no-op node
 		if (nextWorkset == worksetNode || nextWorkset instanceof BinaryUnionNode) {
 			NoOpNode noop = new NoOpNode();
-			noop.setDegreeOfParallelism(getDegreeOfParallelism());
+			noop.setDegreeOfParallelism(getParallelism());
 
-			PactConnection noOpConn = new PactConnection(nextWorkset, noop, executionMode);
+			DagConnection noOpConn = new DagConnection(nextWorkset, noop, executionMode);
 			noop.setIncomingConnection(noOpConn);
 			nextWorkset.addOutgoingConnection(noOpConn);
 			
@@ -179,9 +179,9 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 		// attach an extra node to the solution set delta for the cases where we need to repartition
 		UnaryOperatorNode solutionSetDeltaUpdateAux = new UnaryOperatorNode("Solution-Set Delta", getSolutionSetKeyFields(),
 				new SolutionSetDeltaOperator(getSolutionSetKeyFields()));
-		solutionSetDeltaUpdateAux.setDegreeOfParallelism(getDegreeOfParallelism());
+		solutionSetDeltaUpdateAux.setDegreeOfParallelism(getParallelism());
 
-		PactConnection conn = new PactConnection(solutionSetDelta, solutionSetDeltaUpdateAux, executionMode);
+		DagConnection conn = new DagConnection(solutionSetDelta, solutionSetDeltaUpdateAux, executionMode);
 		solutionSetDeltaUpdateAux.setIncomingConnection(conn);
 		solutionSetDelta.addOutgoingConnection(conn);
 		
@@ -189,10 +189,10 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 		this.nextWorkset = nextWorkset;
 		
 		this.singleRoot = new SingleRootJoiner();
-		this.solutionSetDeltaRootConnection = new PactConnection(solutionSetDeltaUpdateAux,
+		this.solutionSetDeltaRootConnection = new DagConnection(solutionSetDeltaUpdateAux,
 													this.singleRoot, executionMode);
 
-		this.nextWorksetRootConnection = new PactConnection(nextWorkset, this.singleRoot, executionMode);
+		this.nextWorksetRootConnection = new DagConnection(nextWorkset, this.singleRoot, executionMode);
 		this.singleRoot.setInputs(this.solutionSetDeltaRootConnection, this.nextWorksetRootConnection);
 		
 		solutionSetDeltaUpdateAux.addOutgoingConnection(this.solutionSetDeltaRootConnection);
@@ -371,7 +371,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 					UnaryOperatorNode rebuildWorksetPropertiesNode = new UnaryOperatorNode("Rebuild Workset Properties",
 																							FieldList.EMPTY_LIST);
 					
-					rebuildWorksetPropertiesNode.setDegreeOfParallelism(candidate.getDegreeOfParallelism());
+					rebuildWorksetPropertiesNode.setDegreeOfParallelism(candidate.getParallelism());
 					
 					SingleInputPlanNode rebuildWorksetPropertiesPlanNode = new SingleInputPlanNode(
 												rebuildWorksetPropertiesNode, "Rebuild Workset Properties",
@@ -454,7 +454,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 					}
 					
 					WorksetIterationPlanNode wsNode = new WorksetIterationPlanNode(this,
-							"WorksetIteration ("+this.getPactContract().getName()+")", solutionSetIn,
+							"WorksetIteration ("+this.getOperator().getName()+")", solutionSetIn,
 							worksetIn, sspn, wspn, worksetCandidate, solutionSetCandidate);
 					wsNode.setImmediateSolutionSetUpdate(immediateDeltaUpdate);
 					wsNode.initProperties(gp, lp);
@@ -566,7 +566,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode
 			setDegreeOfParallelism(1);
 		}
 		
-		public void setInputs(PactConnection input1, PactConnection input2) {
+		public void setInputs(DagConnection input1, DagConnection input2) {
 			this.input1 = input1;
 			this.input2 = input2;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java
index bd39858..3b05aba 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java
@@ -49,7 +49,7 @@ public class WorksetNode extends AbstractPartialSolutionNode {
 		if (this.cachedPlans != null) {
 			throw new IllegalStateException();
 		} else {
-			WorksetPlanNode wspn = new WorksetPlanNode(this, "Workset ("+this.getPactContract().getName()+")", gProps, lProps, initialInput);
+			WorksetPlanNode wspn = new WorksetPlanNode(this, "Workset ("+this.getOperator().getName()+")", gProps, lProps, initialInput);
 			this.cachedPlans = Collections.<PlanNode>singletonList(wspn);
 		}
 	}
@@ -79,8 +79,8 @@ public class WorksetNode extends AbstractPartialSolutionNode {
 	 * @return The contract.
 	 */
 	@Override
-	public WorksetPlaceHolder<?> getPactContract() {
-		return (WorksetPlaceHolder<?>) super.getPactContract();
+	public WorksetPlaceHolder<?> getOperator() {
+		return (WorksetPlaceHolder<?>) super.getOperator();
 	}
 
 	@Override
@@ -94,7 +94,7 @@ public class WorksetNode extends AbstractPartialSolutionNode {
 			return;
 		}
 
-		PactConnection worksetInput = this.iterationNode.getSecondIncomingConnection();
+		DagConnection worksetInput = this.iterationNode.getSecondIncomingConnection();
 		OptimizerNode worksetSource = worksetInput.getSource();
 		
 		addClosedBranches(worksetSource.closedBranchingNodes);

http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
index a547f04..57ba29d 100644
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
@@ -185,11 +185,7 @@ public class GlobalProperties implements Cloneable {
 	}
 
 	public boolean isExactlyPartitionedOnFields(FieldList fields) {
-		if (this.partitioning.isPartitionedOnKey() && fields.isExactMatch(this.partitioningFields)) {
-			return true;
-		} else {
-			return false;
-		}
+		return this.partitioning.isPartitionedOnKey() && fields.isExactMatch(this.partitioningFields);
 	}
 	
 	public boolean matchesOrderedPartitioning(Ordering o) {


Mime
View raw message