flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [19/53] [abbrv] flink git commit: [optimizer] Rename optimizer project to "flink-optimizer" (previously flink-compiler)
Date Fri, 20 Mar 2015 10:06:58 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
new file mode 100644
index 0000000..55b8785
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
+import org.apache.flink.api.common.operators.base.BulkIterationBase;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.traversals.InterestingPropertyVisitor;
+import org.apache.flink.optimizer.costs.CostEstimator;
+import org.apache.flink.optimizer.dag.WorksetIterationNode.SingleRootJoiner;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.InterestingProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.operators.NoOpDescriptor;
+import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
+import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
+import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.NamedChannel;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.util.Visitor;
+
+/**
+ * A node in the optimizer's program representation for a bulk iteration.
+ */
+public class BulkIterationNode extends SingleInputNode implements IterationNode {
+	
+	private BulkPartialSolutionNode partialSolution;
+	
+	private OptimizerNode terminationCriterion;
+	
+	private OptimizerNode nextPartialSolution;
+	
+	private DagConnection rootConnection;		// connection out of the next partial solution
+	
+	private DagConnection terminationCriterionRootConnection;	// connection out of the term. criterion
+	
+	private OptimizerNode singleRoot;
+	
+	private final int costWeight;
+
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Creates a new node for the bulk iteration.
+	 * 
+	 * @param iteration The bulk iteration the node represents.
+	 */
+	public BulkIterationNode(BulkIterationBase<?> iteration) {
+		super(iteration);
+		
+		if (iteration.getMaximumNumberOfIterations() <= 0) {
+			throw new CompilerException("BulkIteration must have a maximum number of iterations specified.");
+		}
+		
+		int numIters = iteration.getMaximumNumberOfIterations();
+		
+		this.costWeight = (numIters > 0 && numIters < OptimizerNode.MAX_DYNAMIC_PATH_COST_WEIGHT) ?
+			numIters : OptimizerNode.MAX_DYNAMIC_PATH_COST_WEIGHT; 
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	public BulkIterationBase<?> getIterationContract() {
+		return (BulkIterationBase<?>) getOperator();
+	}
+	
+	/**
+	 * Gets the partialSolution from this BulkIterationNode.
+	 *
+	 * @return The partialSolution.
+	 */
+	public BulkPartialSolutionNode getPartialSolution() {
+		return partialSolution;
+	}
+	
+	/**
+	 * Sets the partialSolution for this BulkIterationNode.
+	 *
+	 * @param partialSolution The partialSolution to set.
+	 */
+	public void setPartialSolution(BulkPartialSolutionNode partialSolution) {
+		this.partialSolution = partialSolution;
+	}
+
+	
+	/**
+	 * Gets the nextPartialSolution from this BulkIterationNode.
+	 *
+	 * @return The nextPartialSolution.
+	 */
+	public OptimizerNode getNextPartialSolution() {
+		return nextPartialSolution;
+	}
+	
+	/**
+	 * Sets the nextPartialSolution for this BulkIterationNode.
+	 *
+	 * @param nextPartialSolution The nextPartialSolution to set.
+	 */
+	public void setNextPartialSolution(OptimizerNode nextPartialSolution, OptimizerNode terminationCriterion) {
+		
+		// check if the root of the step function has the same DOP as the iteration
+		// or if the step function has any operator at all
+		if (nextPartialSolution.getParallelism() != getParallelism() ||
+			nextPartialSolution == partialSolution || nextPartialSolution instanceof BinaryUnionNode)
+		{
+			// add a no-op to the root to express the re-partitioning
+			NoOpNode noop = new NoOpNode();
+			noop.setDegreeOfParallelism(getParallelism());
+
+			DagConnection noOpConn = new DagConnection(nextPartialSolution, noop, ExecutionMode.PIPELINED);
+			noop.setIncomingConnection(noOpConn);
+			nextPartialSolution.addOutgoingConnection(noOpConn);
+			
+			nextPartialSolution = noop;
+		}
+		
+		this.nextPartialSolution = nextPartialSolution;
+		this.terminationCriterion = terminationCriterion;
+		
+		if (terminationCriterion == null) {
+			this.singleRoot = nextPartialSolution;
+			this.rootConnection = new DagConnection(nextPartialSolution, ExecutionMode.PIPELINED);
+		}
+		else {
+			// we have a termination criterion
+			SingleRootJoiner singleRootJoiner = new SingleRootJoiner();
+			this.rootConnection = new DagConnection(nextPartialSolution, singleRootJoiner, ExecutionMode.PIPELINED);
+			this.terminationCriterionRootConnection = new DagConnection(terminationCriterion, singleRootJoiner,
+																		ExecutionMode.PIPELINED);
+
+			singleRootJoiner.setInputs(this.rootConnection, this.terminationCriterionRootConnection);
+			
+			this.singleRoot = singleRootJoiner;
+			
+			// add connection to terminationCriterion for interesting properties visitor
+			terminationCriterion.addOutgoingConnection(terminationCriterionRootConnection);
+		
+		}
+		
+		nextPartialSolution.addOutgoingConnection(rootConnection);
+	}
+	
+	public int getCostWeight() {
+		return this.costWeight;
+	}
+	
+	public OptimizerNode getSingleRootOfStepFunction() {
+		return this.singleRoot;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public String getName() {
+		return "Bulk Iteration";
+	}
+
+	@Override
+	public SemanticProperties getSemanticProperties() {
+		return new EmptySemanticProperties();
+	}
+	
+	protected void readStubAnnotations() {}
+	
+	@Override
+	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
+		this.estimatedOutputSize = getPredecessorNode().getEstimatedOutputSize();
+		this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords();
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//                             Properties and Optimization
+	// --------------------------------------------------------------------------------------------
+	
+	protected List<OperatorDescriptorSingle> getPossibleProperties() {
+		return Collections.<OperatorDescriptorSingle>singletonList(new NoOpDescriptor());
+	}
+
+	@Override
+	public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
+		final InterestingProperties intProps = getInterestingProperties().clone();
+		
+		if (this.terminationCriterion != null) {
+			// first propagate through termination Criterion. since it has no successors, it has no
+			// interesting properties
+			this.terminationCriterionRootConnection.setInterestingProperties(new InterestingProperties());
+			this.terminationCriterion.accept(new InterestingPropertyVisitor(estimator));
+		}
+		
+		// we need to make 2 interesting property passes, because the root of the step function needs also
+		// the interesting properties as generated by the partial solution
+		
+		// give our own interesting properties (as generated by the iterations successors) to the step function and
+		// make the first pass
+		this.rootConnection.setInterestingProperties(intProps);
+		this.nextPartialSolution.accept(new InterestingPropertyVisitor(estimator));
+		
+		// take the interesting properties of the partial solution and add them to the root interesting properties
+		InterestingProperties partialSolutionIntProps = this.partialSolution.getInterestingProperties();
+		intProps.getGlobalProperties().addAll(partialSolutionIntProps.getGlobalProperties());
+		intProps.getLocalProperties().addAll(partialSolutionIntProps.getLocalProperties());
+		
+		// clear all interesting properties to prepare the second traversal
+		// this clears only the path down from the next partial solution. The paths down
+		// from the termination criterion (before they meet the paths down from the next partial solution)
+		// remain unaffected by this step
+		this.rootConnection.clearInterestingProperties();
+		this.nextPartialSolution.accept(InterestingPropertiesClearer.INSTANCE);
+		
+		// 2nd pass
+		this.rootConnection.setInterestingProperties(intProps);
+		this.nextPartialSolution.accept(new InterestingPropertyVisitor(estimator));
+		
+		// now add the interesting properties of the partial solution to the input
+		final InterestingProperties inProps = this.partialSolution.getInterestingProperties().clone();
+		inProps.addGlobalProperties(new RequestedGlobalProperties());
+		inProps.addLocalProperties(new RequestedLocalProperties());
+		this.inConn.setInterestingProperties(inProps);
+	}
+	
+	@Override
+	public void clearInterestingProperties() {
+		super.clearInterestingProperties();
+		
+		this.singleRoot.accept(InterestingPropertiesClearer.INSTANCE);
+		this.rootConnection.clearInterestingProperties();
+	}
+
+	@Override
+	public void computeUnclosedBranchStack() {
+		if (this.openBranches != null) {
+			return;
+		}
+
+		// the resulting branches are those of the step function
+		// because the BulkPartialSolution takes the input's branches
+		addClosedBranches(getSingleRootOfStepFunction().closedBranchingNodes);
+		List<UnclosedBranchDescriptor> result = getSingleRootOfStepFunction().openBranches;
+
+		this.openBranches = (result == null || result.isEmpty()) ? Collections.<UnclosedBranchDescriptor>emptyList() : result;
+	}
+
+
+	@Override
+	protected void instantiateCandidate(OperatorDescriptorSingle dps, Channel in, List<Set<? extends NamedChannel>> broadcastPlanChannels, 
+			List<PlanNode> target, CostEstimator estimator, RequestedGlobalProperties globPropsReq, RequestedLocalProperties locPropsReq)
+	{
+		// NOTES ON THE ENUMERATION OF THE STEP FUNCTION PLANS:
+		// Whenever we instantiate the iteration, we enumerate new candidates for the step function.
+		// That way, we make sure we have an appropriate plan for each candidate for the initial partial solution,
+		// we have a fitting candidate for the step function (often, work is pushed out of the step function).
+		// Among the candidates of the step function, we keep only those that meet the requested properties of the
+		// current candidate initial partial solution. That makes sure these properties exist at the beginning of
+		// the successive iteration.
+		
+		// 1) Because we enumerate multiple times, we may need to clean the cached plans
+		//    before starting another enumeration
+		this.nextPartialSolution.accept(PlanCacheCleaner.INSTANCE);
+		if (this.terminationCriterion != null) {
+			this.terminationCriterion.accept(PlanCacheCleaner.INSTANCE);
+		}
+		
+		// 2) Give the partial solution the properties of the current candidate for the initial partial solution
+		this.partialSolution.setCandidateProperties(in.getGlobalProperties(), in.getLocalProperties(), in);
+		final BulkPartialSolutionPlanNode pspn = this.partialSolution.getCurrentPartialSolutionPlanNode();
+		
+		// 3) Get the alternative plans
+		List<PlanNode> candidates = this.nextPartialSolution.getAlternativePlans(estimator);
+		
+		// 4) Make sure that the beginning of the step function does not assume properties that 
+		//    are not also produced by the end of the step function.
+
+		{
+			List<PlanNode> newCandidates = new ArrayList<PlanNode>();
+			
+			for (Iterator<PlanNode> planDeleter = candidates.iterator(); planDeleter.hasNext(); ) {
+				PlanNode candidate = planDeleter.next();
+				
+				GlobalProperties atEndGlobal = candidate.getGlobalProperties();
+				LocalProperties atEndLocal = candidate.getLocalProperties();
+				
+				FeedbackPropertiesMeetRequirementsReport report = candidate.checkPartialSolutionPropertiesMet(pspn, atEndGlobal, atEndLocal);
+				if (report == FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) {
+					; // depends only through broadcast variable on the partial solution
+				}
+				else if (report == FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
+					// attach a no-op node through which we create the properties of the original input
+					Channel toNoOp = new Channel(candidate);
+					globPropsReq.parameterizeChannel(toNoOp, false, rootConnection.getDataExchangeMode(), false);
+					locPropsReq.parameterizeChannel(toNoOp);
+					
+					UnaryOperatorNode rebuildPropertiesNode = new UnaryOperatorNode("Rebuild Partial Solution Properties", FieldList.EMPTY_LIST);
+					rebuildPropertiesNode.setDegreeOfParallelism(candidate.getParallelism());
+					
+					SingleInputPlanNode rebuildPropertiesPlanNode = new SingleInputPlanNode(rebuildPropertiesNode, "Rebuild Partial Solution Properties", toNoOp, DriverStrategy.UNARY_NO_OP);
+					rebuildPropertiesPlanNode.initProperties(toNoOp.getGlobalProperties(), toNoOp.getLocalProperties());
+					estimator.costOperator(rebuildPropertiesPlanNode);
+						
+					GlobalProperties atEndGlobalModified = rebuildPropertiesPlanNode.getGlobalProperties();
+					LocalProperties atEndLocalModified = rebuildPropertiesPlanNode.getLocalProperties();
+						
+					if (!(atEndGlobalModified.equals(atEndGlobal) && atEndLocalModified.equals(atEndLocal))) {
+						FeedbackPropertiesMeetRequirementsReport report2 = candidate.checkPartialSolutionPropertiesMet(pspn, atEndGlobalModified, atEndLocalModified);
+						
+						if (report2 != FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
+							newCandidates.add(rebuildPropertiesPlanNode);
+						}
+					}
+					
+					planDeleter.remove();
+				}
+			}
+		}
+		
+		if (candidates.isEmpty()) {
+			return;
+		}
+		
+		// 5) Create a candidate for the Iteration Node for every remaining plan of the step function.
+		if (terminationCriterion == null) {
+			for (PlanNode candidate : candidates) {
+				BulkIterationPlanNode node = new BulkIterationPlanNode(this, "BulkIteration ("+this.getOperator().getName()+")", in, pspn, candidate);
+				GlobalProperties gProps = candidate.getGlobalProperties().clone();
+				LocalProperties lProps = candidate.getLocalProperties().clone();
+				node.initProperties(gProps, lProps);
+				target.add(node);
+			}
+		}
+		else if (candidates.size() > 0) {
+			List<PlanNode> terminationCriterionCandidates = this.terminationCriterion.getAlternativePlans(estimator);
+
+			SingleRootJoiner singleRoot = (SingleRootJoiner) this.singleRoot;
+			
+			for (PlanNode candidate : candidates) {
+				for (PlanNode terminationCandidate : terminationCriterionCandidates) {
+					if (singleRoot.areBranchCompatible(candidate, terminationCandidate)) {
+						BulkIterationPlanNode node = new BulkIterationPlanNode(this, "BulkIteration ("+this.getOperator().getName()+")", in, pspn, candidate, terminationCandidate);
+						GlobalProperties gProps = candidate.getGlobalProperties().clone();
+						LocalProperties lProps = candidate.getLocalProperties().clone();
+						node.initProperties(gProps, lProps);
+						target.add(node);
+						
+					}
+				}
+			}
+			
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//                      Iteration Specific Traversals
+	// --------------------------------------------------------------------------------------------
+
+	public void acceptForStepFunction(Visitor<OptimizerNode> visitor) {
+		this.singleRoot.accept(visitor);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java
new file mode 100644
index 0000000..25a7eef
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.base.BulkIterationBase.PartialSolutionPlaceHolder;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.PlanNode;
+
+/**
+ * The optimizer's internal representation of the partial solution that is input to a bulk iteration.
+ */
+public class BulkPartialSolutionNode extends AbstractPartialSolutionNode {
+	
+	private final BulkIterationNode iterationNode;
+	
+	
+	public BulkPartialSolutionNode(PartialSolutionPlaceHolder<?> psph, BulkIterationNode iterationNode) {
+		super(psph);
+		this.iterationNode = iterationNode;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	public void setCandidateProperties(GlobalProperties gProps, LocalProperties lProps, Channel initialInput) {
+		if (this.cachedPlans != null) {
+			throw new IllegalStateException();
+		} else {
+			this.cachedPlans = Collections.<PlanNode>singletonList(new BulkPartialSolutionPlanNode(this,
+					"PartialSolution ("+this.getOperator().getName()+")", gProps, lProps, initialInput));
+		}
+	}
+	
+	public BulkPartialSolutionPlanNode getCurrentPartialSolutionPlanNode() {
+		if (this.cachedPlans != null) {
+			return (BulkPartialSolutionPlanNode) this.cachedPlans.get(0);
+		} else {
+			throw new IllegalStateException();
+		}
+	}
+	
+	public BulkIterationNode getIterationNode() {
+		return this.iterationNode;
+	}
+	
+	@Override
+	public void computeOutputEstimates(DataStatistics statistics) {
+		copyEstimates(this.iterationNode.getPredecessorNode());
+	}
+	
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Gets the operator (here the {@link PartialSolutionPlaceHolder}) that is represented by this
+	 * optimizer node.
+	 * 
+	 * @return The operator represented by this optimizer node.
+	 */
+	@Override
+	public PartialSolutionPlaceHolder<?> getOperator() {
+		return (PartialSolutionPlaceHolder<?>) super.getOperator();
+	}
+
+	@Override
+	public String getName() {
+		return "Bulk Partial Solution";
+	}
+
+	@Override
+	public void computeUnclosedBranchStack() {
+		if (this.openBranches != null) {
+			return;
+		}
+
+		OptimizerNode inputToIteration = this.iterationNode.getPredecessorNode();
+		
+		addClosedBranches(inputToIteration.closedBranchingNodes);
+		List<UnclosedBranchDescriptor> fromInput = inputToIteration.getBranchesForParent(this.iterationNode.getIncomingConnection());
+		this.openBranches = (fromInput == null || fromInput.isEmpty()) ? Collections.<UnclosedBranchDescriptor>emptyList() : fromInput;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
new file mode 100644
index 0000000..92076c3
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.operators.CoGroupDescriptor;
+import org.apache.flink.optimizer.operators.CoGroupWithSolutionSetFirstDescriptor;
+import org.apache.flink.optimizer.operators.CoGroupWithSolutionSetSecondDescriptor;
+import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
+
+/**
+ * The Optimizer representation of a <i>CoGroup</i> operator.
+ */
+public class CoGroupNode extends TwoInputNode {
+	
+	private List<OperatorDescriptorDual> dataProperties;
+	
+	public CoGroupNode(CoGroupOperatorBase<?, ?, ?, ?> operator) {
+		super(operator);
+		this.dataProperties = initializeDataProperties(operator.getCustomPartitioner());
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Gets the operator for this CoGroup node.
+	 * 
+	 * @return The CoGroup operator.
+	 */
+	@Override
+	public CoGroupOperatorBase<?, ?, ?, ?> getOperator() {
+		return (CoGroupOperatorBase<?, ?, ?, ?>) super.getOperator();
+	}
+
+	@Override
+	public String getName() {
+		return "CoGroup";
+	}
+
+	@Override
+	protected List<OperatorDescriptorDual> getPossibleProperties() {
+		return this.dataProperties;
+	}
+	
+	public void makeCoGroupWithSolutionSet(int solutionsetInputIndex) {
+		OperatorDescriptorDual op;
+		if (solutionsetInputIndex == 0) {
+			op = new CoGroupWithSolutionSetFirstDescriptor(keys1, keys2);
+		} else if (solutionsetInputIndex == 1) {
+			op = new CoGroupWithSolutionSetSecondDescriptor(keys1, keys2);
+		} else {
+			throw new IllegalArgumentException();
+		}
+		this.dataProperties = Collections.<OperatorDescriptorDual>singletonList(op);
+	}
+
+	@Override
+	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
+		// for CoGroup, we currently make no reasonable default estimates
+	}
+	
+	private List<OperatorDescriptorDual> initializeDataProperties(Partitioner<?> customPartitioner) {
+		Ordering groupOrder1 = null;
+		Ordering groupOrder2 = null;
+		
+		CoGroupOperatorBase<?, ?, ?, ?> cgc = getOperator();
+		groupOrder1 = cgc.getGroupOrderForInputOne();
+		groupOrder2 = cgc.getGroupOrderForInputTwo();
+			
+		if (groupOrder1 != null && groupOrder1.getNumberOfFields() == 0) {
+			groupOrder1 = null;
+		}
+		if (groupOrder2 != null && groupOrder2.getNumberOfFields() == 0) {
+			groupOrder2 = null;
+		}
+		
+		CoGroupDescriptor descr = new CoGroupDescriptor(this.keys1, this.keys2, groupOrder1, groupOrder2);
+		if (customPartitioner != null) {
+			descr.setCustomPartitioner(customPartitioner);
+		}
+		
+		return Collections.<OperatorDescriptorDual>singletonList(descr);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java
new file mode 100644
index 0000000..93be1e4
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.SingleInputOperator;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.operators.CollectorMapDescriptor;
+import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
+
+/**
+ * The optimizer's internal representation of a <i>Map</i> operator node.
+ */
+public class CollectorMapNode extends SingleInputNode {
+	
+	private final List<OperatorDescriptorSingle> possibleProperties;
+
+	
+	public CollectorMapNode(SingleInputOperator<?, ?, ?> operator) {
+		super(operator);
+		
+		this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new CollectorMapDescriptor());
+	}
+
+	@Override
+	public String getName() {
+		return "Map";
+	}
+
+	@Override
+	protected List<OperatorDescriptorSingle> getPossibleProperties() {
+		return this.possibleProperties;
+	}
+
+	/**
+	 * Computes the estimates for the Map operator. Map takes one value and transforms it into another value.
+	 * The cardinality consequently stays the same.
+	 */
+	@Override
+	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
+		this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java
new file mode 100644
index 0000000..8de67e8
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.base.CrossOperatorBase;
+import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.operators.CrossBlockOuterFirstDescriptor;
+import org.apache.flink.optimizer.operators.CrossBlockOuterSecondDescriptor;
+import org.apache.flink.optimizer.operators.CrossStreamOuterFirstDescriptor;
+import org.apache.flink.optimizer.operators.CrossStreamOuterSecondDescriptor;
+import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * The Optimizer representation of a <i>Cross</i> (Cartesian product) operator.
+ */
+public class CrossNode extends TwoInputNode {
+	
+	private final List<OperatorDescriptorDual> dataProperties;
+	
+	/**
+	 * Creates a new CrossNode for the given operator.
+	 * 
+	 * @param operation The Cross operator object.
+	 */
+	public CrossNode(CrossOperatorBase<?, ?, ?, ?> operation) {
+		super(operation);
+		
+		Configuration conf = operation.getParameters();
+		String localStrategy = conf.getString(Optimizer.HINT_LOCAL_STRATEGY, null);
+	
+		CrossHint hint = operation.getCrossHint();
+		
+		if (localStrategy != null) {
+			
+			final boolean allowBCfirst = hint != CrossHint.SECOND_IS_SMALL;
+			final boolean allowBCsecond = hint != CrossHint.FIRST_IS_SMALL;
+			
+			final OperatorDescriptorDual fixedDriverStrat;
+			if (Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST.equals(localStrategy)) {
+				fixedDriverStrat = new CrossBlockOuterFirstDescriptor(allowBCfirst, allowBCsecond);
+			} else if (Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND.equals(localStrategy)) {
+				fixedDriverStrat = new CrossBlockOuterSecondDescriptor(allowBCfirst, allowBCsecond);
+			} else if (Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST.equals(localStrategy)) {
+				fixedDriverStrat = new CrossStreamOuterFirstDescriptor(allowBCfirst, allowBCsecond);
+			} else if (Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND.equals(localStrategy)) {
+				fixedDriverStrat = new CrossStreamOuterSecondDescriptor(allowBCfirst, allowBCsecond);
+			} else {
+				throw new CompilerException("Invalid local strategy hint for cross contract: " + localStrategy);
+			}
+			
+			this.dataProperties = Collections.singletonList(fixedDriverStrat);
+		}
+		else if (hint == CrossHint.SECOND_IS_SMALL) {
+			ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
+			list.add(new CrossBlockOuterSecondDescriptor(false, true));
+			list.add(new CrossStreamOuterFirstDescriptor(false, true));
+			this.dataProperties = list;
+		}
+		else if (hint == CrossHint.FIRST_IS_SMALL) {
+			ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
+			list.add(new CrossBlockOuterFirstDescriptor(true, false));
+			list.add(new CrossStreamOuterSecondDescriptor(true, false));
+			this.dataProperties = list;
+		}
+		else {
+			ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
+			list.add(new CrossBlockOuterFirstDescriptor());
+			list.add(new CrossBlockOuterSecondDescriptor());
+			list.add(new CrossStreamOuterFirstDescriptor());
+			list.add(new CrossStreamOuterSecondDescriptor());
+			this.dataProperties = list;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public CrossOperatorBase<?, ?, ?, ?> getOperator() {
+		return (CrossOperatorBase<?, ?, ?, ?>) super.getOperator();
+	}
+
+	@Override
+	public String getName() {
+		return "Cross";
+	}
+	
+	@Override
+	protected List<OperatorDescriptorDual> getPossibleProperties() {
+		return this.dataProperties;
+	}
+
+	/**
+	 * We assume that the cardinality is the product of  the input cardinalities
+	 * and that the result width is the sum of the input widths.
+	 * 
+	 * @param statistics The statistics object to optionally access.
+	 */
+	@Override
+	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
+		long card1 = getFirstPredecessorNode().getEstimatedNumRecords();
+		long card2 = getSecondPredecessorNode().getEstimatedNumRecords();
+		this.estimatedNumRecords = (card1 < 0 || card2 < 0) ? -1 : card1 * card2;
+		
+		if (this.estimatedNumRecords >= 0) {
+			float width1 = getFirstPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
+			float width2 = getSecondPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
+			float width = (width1 <= 0 || width2 <= 0) ? -1 : width1 + width2;
+			
+			if (width > 0) {
+				this.estimatedOutputSize = (long) (width * this.estimatedNumRecords);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java
new file mode 100644
index 0000000..4e65976
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.optimizer.dataproperties.InterestingProperties;
+import org.apache.flink.optimizer.plandump.DumpableConnection;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+
+/**
+ * A connection between to operators. Represents an intermediate result
+ * and a data exchange between the two operators.
+ *
+ * The data exchange has a mode in which it performs (batch / pipelined).
+ *
+ * The data exchange strategy may be set on this connection, in which case
+ * it is fixed and will not be determined during candidate plan enumeration.
+ *
+ * During the enumeration of interesting properties, this connection also holds
+ * all interesting properties generated by the successor operator.
+ */
+public class DagConnection implements EstimateProvider, DumpableConnection<OptimizerNode> {
+	
+	private final OptimizerNode source; // The source node of the connection
+
+	private final OptimizerNode target; // The target node of the connection.
+
+	private final ExecutionMode dataExchangeMode; // defines whether to use batch or pipelined data exchange
+
+	private InterestingProperties interestingProps; // local properties that succeeding nodes are interested in
+
+	private ShipStrategyType shipStrategy; // The data shipping strategy, if predefined.
+	
+	private TempMode materializationMode = TempMode.NONE; // the materialization mode
+	
+	private int maxDepth = -1;
+
+	private boolean breakPipeline;  // whether this connection should break the pipeline due to potential deadlocks
+
+	/**
+	 * Creates a new Connection between two nodes. The shipping strategy is by default <tt>NONE</tt>.
+	 * The temp mode is by default <tt>NONE</tt>.
+	 * 
+	 * @param source
+	 *        The source node.
+	 * @param target
+	 *        The target node.
+	 */
+	public DagConnection(OptimizerNode source, OptimizerNode target, ExecutionMode exchangeMode) {
+		this(source, target, null, exchangeMode);
+	}
+
+	/**
+	 * Creates a new Connection between two nodes.
+	 * 
+	 * @param source
+	 *        The source node.
+	 * @param target
+	 *        The target node.
+	 * @param shipStrategy
+	 *        The shipping strategy.
+	 * @param exchangeMode
+	 *        The data exchange mode (pipelined / batch / batch only for shuffles / ... )
+	 */
+	public DagConnection(OptimizerNode source, OptimizerNode target,
+							ShipStrategyType shipStrategy, ExecutionMode exchangeMode)
+	{
+		if (source == null || target == null) {
+			throw new NullPointerException("Source and target must not be null.");
+		}
+		this.source = source;
+		this.target = target;
+		this.shipStrategy = shipStrategy;
+		this.dataExchangeMode = exchangeMode;
+	}
+	
+	/**
+	 * Constructor to create a result from an operator that is not
+	 * consumed by another operator.
+	 * 
+	 * @param source
+	 *        The source node.
+	 */
+	public DagConnection(OptimizerNode source, ExecutionMode exchangeMode) {
+		if (source == null) {
+			throw new NullPointerException("Source and target must not be null.");
+		}
+		this.source = source;
+		this.target = null;
+		this.shipStrategy = ShipStrategyType.NONE;
+		this.dataExchangeMode = exchangeMode;
+	}
+
+	/**
+	 * Gets the source of the connection.
+	 * 
+	 * @return The source Node.
+	 */
+	public OptimizerNode getSource() {
+		return this.source;
+	}
+
+	/**
+	 * Gets the target of the connection.
+	 * 
+	 * @return The target node.
+	 */
+	public OptimizerNode getTarget() {
+		return this.target;
+	}
+
+	/**
+	 * Gets the shipping strategy for this connection.
+	 * 
+	 * @return The connection's shipping strategy.
+	 */
+	public ShipStrategyType getShipStrategy() {
+		return this.shipStrategy;
+	}
+
+	/**
+	 * Sets the shipping strategy for this connection.
+	 * 
+	 * @param strategy
+	 *        The shipping strategy to be applied to this connection.
+	 */
+	public void setShipStrategy(ShipStrategyType strategy) {
+		this.shipStrategy = strategy;
+	}
+
+	/**
+	 * Gets the data exchange mode to use for this connection.
+	 *
+	 * @return The data exchange mode to use for this connection.
+	 */
+	public ExecutionMode getDataExchangeMode() {
+		if (dataExchangeMode == null) {
+			throw new IllegalStateException("This connection does not have the data exchange mode set");
+		}
+		return dataExchangeMode;
+	}
+
+	/**
+	 * Marks that this connection should do a decoupled data exchange (such as batched)
+	 * rather then pipeline data. Connections are marked as pipeline breakers to avoid
+	 * deadlock situations.
+	 */
+	public void markBreaksPipeline() {
+		this.breakPipeline = true;
+	}
+
+	/**
+	 * Checks whether this connection is marked to break the pipeline.
+	 *
+	 * @return True, if this connection is marked to break the pipeline, false otherwise.
+	 */
+	public boolean isBreakingPipeline() {
+		return this.breakPipeline;
+	}
+
+	/**
+	 * Gets the interesting properties object for this pact connection.
+	 * If the interesting properties for this connections have not yet been set,
+	 * this method returns null.
+	 * 
+	 * @return The collection of all interesting properties, or null, if they have not yet been set.
+	 */
+	public InterestingProperties getInterestingProperties() {
+		return this.interestingProps;
+	}
+
+	/**
+	 * Sets the interesting properties for this pact connection.
+	 * 
+	 * @param props The interesting properties.
+	 */
+	public void setInterestingProperties(InterestingProperties props) {
+		if (this.interestingProps == null) {
+			this.interestingProps = props;
+		} else {
+			throw new IllegalStateException("Interesting Properties have already been set.");
+		}
+	}
+	
+	public void clearInterestingProperties() {
+		this.interestingProps = null;
+	}
+	
+	public void initMaxDepth() {
+		
+		if (this.maxDepth == -1) {
+			this.maxDepth = this.source.getMaxDepth() + 1;
+		} else {
+			throw new IllegalStateException("Maximum path depth has already been initialized.");
+		}
+	}
+	
+	public int getMaxDepth() {
+		if (this.maxDepth != -1) {
+			return this.maxDepth;
+		} else {
+			throw new IllegalStateException("Maximum path depth has not been initialized.");
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Estimates
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public long getEstimatedOutputSize() {
+		return this.source.getEstimatedOutputSize();
+	}
+
+	@Override
+	public long getEstimatedNumRecords() {
+		return this.source.getEstimatedNumRecords();
+	}
+	
+	@Override
+	public float getEstimatedAvgWidthPerOutputRecord() {
+		return this.source.getEstimatedAvgWidthPerOutputRecord();
+	}
+	
+	// --------------------------------------------------------------------------------------------
+
+	
+	public TempMode getMaterializationMode() {
+		return this.materializationMode;
+	}
+	
+	public void setMaterializationMode(TempMode materializationMode) {
+		this.materializationMode = materializationMode;
+	}
+	
+	public boolean isOnDynamicPath() {
+		return this.source.isOnDynamicPath();
+	}
+	
+	public int getCostWeight() {
+		return this.source.getCostWeight();
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	public String toString() {
+		StringBuilder buf = new StringBuilder(50);
+		buf.append("Connection: ");
+
+		if (this.source == null) {
+			buf.append("null");
+		} else {
+			buf.append(this.source.getOperator().getName());
+			buf.append('(').append(this.source.getName()).append(')');
+		}
+
+		buf.append(" -> ");
+
+		if (this.shipStrategy != null) {
+			buf.append('[');
+			buf.append(this.shipStrategy.name());
+			buf.append(']').append(' ');
+		}
+
+		if (this.target == null) {
+			buf.append("null");
+		} else {
+			buf.append(this.target.getOperator().getName());
+			buf.append('(').append(this.target.getName()).append(')');
+		}
+
+		return buf.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
new file mode 100644
index 0000000..dbe04f4
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSinkNode.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.operators.GenericDataSinkBase;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.costs.CostEstimator;
+import org.apache.flink.optimizer.dataproperties.InterestingProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.util.Visitor;
+
+/**
+ * The Optimizer representation of a data sink.
+ */
+public class DataSinkNode extends OptimizerNode {
+	
+	protected DagConnection input;			// The input edge
+	
+	/**
+	 * Creates a new DataSinkNode for the given sink operator.
+	 * 
+	 * @param sink The data sink contract object.
+	 */
+	public DataSinkNode(GenericDataSinkBase<?> sink) {
+		super(sink);
+	}
+
+	// --------------------------------------------------------------------------------------
+	
+	/**
+	 * Gets the input of the sink.
+	 * 
+	 * @return The input connection.
+	 */
+	public DagConnection getInputConnection() {
+		return this.input;
+	}
+	
+	/**
+	 * Gets the predecessor of this node.
+	 *
+	 * @return The predecessor, or null, if no predecessor has been set.
+	 */
+	public OptimizerNode getPredecessorNode() {
+		if(this.input != null) {
+			return input.getSource();
+		} else {
+			return null;
+		}
+	}
+
+	/**
+	 * Gets the operator for which this optimizer sink node was created.
+	 * 
+	 * @return The node's underlying operator.
+	 */
+	@Override
+	public GenericDataSinkBase<?> getOperator() {
+		return (GenericDataSinkBase<?>) super.getOperator();
+	}
+
+	@Override
+	public String getName() {
+		return "Data Sink";
+	}
+
+	@Override
+	public List<DagConnection> getIncomingConnections() {
+		return Collections.singletonList(this.input);
+	}
+
+	/**
+	 * Gets all outgoing connections, which is an empty set for the data sink.
+	 *
+	 * @return An empty list.
+	 */
+	@Override
+	public List<DagConnection> getOutgoingConnections() {
+		return Collections.emptyList();
+	}
+
+	@Override
+	public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode defaultExchangeMode) {
+		Operator<?> children = getOperator().getInput();
+
+		final OptimizerNode pred;
+		final DagConnection conn;
+		
+		pred = contractToNode.get(children);
+		conn = new DagConnection(pred, this, defaultExchangeMode);
+			
+		// create the connection and add it
+		this.input = conn;
+		pred.addOutgoingConnection(conn);
+	}
+
+	/**
+	 * Computes the estimated outputs for the data sink. Since the sink does not modify anything, it simply
+	 * copies the output estimates from its direct predecessor.
+	 */
+	@Override
+	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
+		this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords();
+		this.estimatedOutputSize = getPredecessorNode().getEstimatedOutputSize();
+	}
+
+	@Override
+	public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
+		final InterestingProperties iProps = new InterestingProperties();
+		
+		{
+			final Ordering partitioning = getOperator().getPartitionOrdering();
+			final DataDistribution dataDist = getOperator().getDataDistribution();
+			final RequestedGlobalProperties partitioningProps = new RequestedGlobalProperties();
+			if (partitioning != null) {
+				if(dataDist != null) {
+					partitioningProps.setRangePartitioned(partitioning, dataDist);
+				} else {
+					partitioningProps.setRangePartitioned(partitioning);
+				}
+				iProps.addGlobalProperties(partitioningProps);
+			}
+			iProps.addGlobalProperties(partitioningProps);
+		}
+		
+		{
+			final Ordering localOrder = getOperator().getLocalOrder();
+			final RequestedLocalProperties orderProps = new RequestedLocalProperties();
+			if (localOrder != null) {
+				orderProps.setOrdering(localOrder);
+			}
+			iProps.addLocalProperties(orderProps);
+		}
+		
+		this.input.setInterestingProperties(iProps);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//                                     Branch Handling
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void computeUnclosedBranchStack() {
+		if (this.openBranches != null) {
+			return;
+		}
+
+		// we need to track open branches even in the sinks, because they get "closed" when
+		// we build a single "root" for the data flow plan
+		addClosedBranches(getPredecessorNode().closedBranchingNodes);
+		this.openBranches = getPredecessorNode().getBranchesForParent(this.input);
+	}
+	
+	@Override
+	protected List<UnclosedBranchDescriptor> getBranchesForParent(DagConnection parent) {
+		// return our own stack of open branches, because nothing is added
+		return this.openBranches;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//                                   Recursive Optimization
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public List<PlanNode> getAlternativePlans(CostEstimator estimator) {
+		// check if we have a cached version
+		if (this.cachedPlans != null) {
+			return this.cachedPlans;
+		}
+		
+		// calculate alternative sub-plans for predecessor
+		List<? extends PlanNode> subPlans = getPredecessorNode().getAlternativePlans(estimator);
+		List<PlanNode> outputPlans = new ArrayList<PlanNode>();
+		
+		final int dop = getParallelism();
+		final int inDop = getPredecessorNode().getParallelism();
+
+		final ExecutionMode executionMode = this.input.getDataExchangeMode();
+		final boolean dopChange = dop != inDop;
+		final boolean breakPipeline = this.input.isBreakingPipeline();
+
+		InterestingProperties ips = this.input.getInterestingProperties();
+		for (PlanNode p : subPlans) {
+			for (RequestedGlobalProperties gp : ips.getGlobalProperties()) {
+				for (RequestedLocalProperties lp : ips.getLocalProperties()) {
+					Channel c = new Channel(p);
+					gp.parameterizeChannel(c, dopChange, executionMode, breakPipeline);
+					lp.parameterizeChannel(c);
+					c.setRequiredLocalProps(lp);
+					c.setRequiredGlobalProps(gp);
+					
+					// no need to check whether the created properties meet what we need in case
+					// of ordering or global ordering, because the only interesting properties we have
+					// are what we require
+					outputPlans.add(new SinkPlanNode(this, "DataSink ("+this.getOperator().getName()+")" ,c));
+				}
+			}
+		}
+		
+		// cost and prune the plans
+		for (PlanNode node : outputPlans) {
+			estimator.costOperator(node);
+		}
+		prunePlanAlternatives(outputPlans);
+
+		this.cachedPlans = outputPlans;
+		return outputPlans;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//                                   Function Annotation Handling
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public SemanticProperties getSemanticProperties() {
+		return new EmptySemanticProperties();
+	}
+		
+	// --------------------------------------------------------------------------------------------
+	//                                     Miscellaneous
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public void accept(Visitor<OptimizerNode> visitor) {
+		if (visitor.preVisit(this)) {
+			if (getPredecessorNode() != null) {
+				getPredecessorNode().accept(visitor);
+			} else {
+				throw new CompilerException();
+			}
+			visitor.postVisit(this);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
new file mode 100644
index 0000000..e4b35b7
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/DataSourceNode.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.NonParallelInput;
+import org.apache.flink.api.common.io.ReplicatingInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.operators.GenericDataSourceBase;
+import org.apache.flink.api.common.operators.GenericDataSourceBase.SplitDataProperties;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.CostEstimator;
+import org.apache.flink.optimizer.costs.Costs;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SourcePlanNode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Visitor;
+
+/**
+ * The optimizer's internal representation of a data source.
+ */
+public class DataSourceNode extends OptimizerNode {
+	
+	private final boolean sequentialInput;
+
+	private final boolean replicatedInput;
+
+	private GlobalProperties gprops;
+
+	private LocalProperties lprops;
+
+	/**
+	 * Creates a new DataSourceNode for the given contract.
+	 * 
+	 * @param pactContract
+	 *        The data source contract object.
+	 */
+	public DataSourceNode(GenericDataSourceBase<?, ?> pactContract) {
+		super(pactContract);
+		
+		if (pactContract.getUserCodeWrapper().getUserCodeClass() == null) {
+			throw new IllegalArgumentException("Input format has not been set.");
+		}
+		
+		if (NonParallelInput.class.isAssignableFrom(pactContract.getUserCodeWrapper().getUserCodeClass())) {
+			setDegreeOfParallelism(1);
+			this.sequentialInput = true;
+		} else {
+			this.sequentialInput = false;
+		}
+
+		this.replicatedInput = ReplicatingInputFormat.class.isAssignableFrom(
+														pactContract.getUserCodeWrapper().getUserCodeClass());
+
+		this.gprops = new GlobalProperties();
+		this.lprops = new LocalProperties();
+
+		SplitDataProperties<?> splitProps = pactContract.getSplitDataProperties();
+
+		if(replicatedInput) {
+			this.gprops.setFullyReplicated();
+			this.lprops = new LocalProperties();
+		} else if (splitProps != null) {
+			// configure data properties of data source using split properties
+			setDataPropertiesFromSplitProperties(splitProps);
+		}
+
+	}
+
+	/**
+	 * Gets the contract object for this data source node.
+	 * 
+	 * @return The contract.
+	 */
+	@Override
+	public GenericDataSourceBase<?, ?> getOperator() {
+		return (GenericDataSourceBase<?, ?>) super.getOperator();
+	}
+
+	@Override
+	public String getName() {
+		return "Data Source";
+	}
+
+	@Override
+	public void setDegreeOfParallelism(int degreeOfParallelism) {
+		// if unsplittable, parallelism remains at 1
+		if (!this.sequentialInput) {
+			super.setDegreeOfParallelism(degreeOfParallelism);
+		}
+	}
+
+	@Override
+	public List<DagConnection> getIncomingConnections() {
+		return Collections.<DagConnection>emptyList();
+	}
+
+	@Override
+	public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode defaultDataExchangeMode) {}
+
+	@Override
+	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
+		// see, if we have a statistics object that can tell us a bit about the file
+		if (statistics != null) {
+			// instantiate the input format, as this is needed by the statistics 
+			InputFormat<?, ?> format;
+			String inFormatDescription = "<unknown>";
+			
+			try {
+				format = getOperator().getFormatWrapper().getUserCodeObject();
+				Configuration config = getOperator().getParameters();
+				format.configure(config);
+			}
+			catch (Throwable t) {
+				if (Optimizer.LOG.isWarnEnabled()) {
+					Optimizer.LOG.warn("Could not instantiate InputFormat to obtain statistics."
+						+ " Limited statistics will be available.", t);
+				}
+				return;
+			}
+			try {
+				inFormatDescription = format.toString();
+			}
+			catch (Throwable t) {
+				// we can ignore this error, as it only prevents us to use a cosmetic string
+			}
+			
+			// first of all, get the statistics from the cache
+			final String statisticsKey = getOperator().getStatisticsKey();
+			final BaseStatistics cachedStatistics = statistics.getBaseStatistics(statisticsKey);
+			
+			BaseStatistics bs = null;
+			try {
+				bs = format.getStatistics(cachedStatistics);
+			}
+			catch (Throwable t) {
+				if (Optimizer.LOG.isWarnEnabled()) {
+					Optimizer.LOG.warn("Error obtaining statistics from input format: " + t.getMessage(), t);
+				}
+			}
+			
+			if (bs != null) {
+				final long len = bs.getTotalInputSize();
+				if (len == BaseStatistics.SIZE_UNKNOWN) {
+					if (Optimizer.LOG.isInfoEnabled()) {
+						Optimizer.LOG.info("Compiler could not determine the size of input '" + inFormatDescription + "'. Using default estimates.");
+					}
+				}
+				else if (len >= 0) {
+					this.estimatedOutputSize = len;
+				}
+				
+				final long card = bs.getNumberOfRecords();
+				if (card != BaseStatistics.NUM_RECORDS_UNKNOWN) {
+					this.estimatedNumRecords = card;
+				}
+			}
+		}
+	}
+
+	@Override
+	public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
+		// no children, so nothing to compute
+	}
+
+	@Override
+	public void computeUnclosedBranchStack() {
+		// because there are no inputs, there are no unclosed branches.
+		this.openBranches = Collections.emptyList();
+	}
+
+	@Override
+	public List<PlanNode> getAlternativePlans(CostEstimator estimator) {
+		if (this.cachedPlans != null) {
+			return this.cachedPlans;
+		}
+
+		SourcePlanNode candidate = new SourcePlanNode(this, "DataSource ("+this.getOperator().getName()+")",
+				this.gprops, this.lprops);
+
+		if(!replicatedInput) {
+			candidate.updatePropertiesWithUniqueSets(getUniqueFields());
+
+			final Costs costs = new Costs();
+			if (FileInputFormat.class.isAssignableFrom(getOperator().getFormatWrapper().getUserCodeClass()) &&
+					this.estimatedOutputSize >= 0) {
+				estimator.addFileInputCost(this.estimatedOutputSize, costs);
+			}
+			candidate.setCosts(costs);
+		} else {
+			// replicated input
+			final Costs costs = new Costs();
+			InputFormat<?,?> inputFormat =
+					((ReplicatingInputFormat<?,?>) getOperator().getFormatWrapper().getUserCodeObject()).getReplicatedInputFormat();
+			if (FileInputFormat.class.isAssignableFrom(inputFormat.getClass()) &&
+					this.estimatedOutputSize >= 0) {
+				estimator.addFileInputCost(this.estimatedOutputSize * this.getParallelism(), costs);
+			}
+			candidate.setCosts(costs);
+		}
+
+		// since there is only a single plan for the data-source, return a list with that element only
+		List<PlanNode> plans = new ArrayList<PlanNode>(1);
+		plans.add(candidate);
+
+		this.cachedPlans = plans;
+		return plans;
+	}
+
+	@Override
+	public SemanticProperties getSemanticProperties() {
+		return new EmptySemanticProperties();
+	}
+	
+	@Override
+	public void accept(Visitor<OptimizerNode> visitor) {
+		if (visitor.preVisit(this)) {
+			visitor.postVisit(this);
+		}
+	}
+
+	private void setDataPropertiesFromSplitProperties(SplitDataProperties splitProps) {
+
+		// set global properties
+		int[] partitionKeys = splitProps.getSplitPartitionKeys();
+		Partitioner<?> partitioner = splitProps.getSplitPartitioner();
+
+		if(partitionKeys != null && partitioner != null) {
+			this.gprops.setCustomPartitioned(new FieldList(partitionKeys), partitioner);
+		}
+		else if(partitionKeys != null) {
+			this.gprops.setAnyPartitioning(new FieldList(partitionKeys));
+		}
+		// set local properties
+		int[] groupingKeys = splitProps.getSplitGroupKeys();
+		Ordering ordering = splitProps.getSplitOrder();
+
+		// more than one split per source tasks possible.
+		// adapt split grouping and sorting
+		if(ordering != null) {
+
+			// sorting falls back to grouping because a source can read multiple,
+			// randomly assigned splits
+			groupingKeys = ordering.getFieldPositions();
+		}
+
+		if(groupingKeys != null && partitionKeys != null) {
+			// check if grouping is also valid across splits, i.e., whether grouping keys are
+			// valid superset of partition keys
+			boolean allFieldsIncluded = true;
+			for(int i : partitionKeys) {
+				boolean fieldIncluded = false;
+				for(int j : groupingKeys) {
+					if(i == j) {
+						fieldIncluded = true;
+						break;
+					}
+				}
+				if(!fieldIncluded) {
+					allFieldsIncluded = false;
+					break;
+				}
+			}
+			if (allFieldsIncluded) {
+				this.lprops = LocalProperties.forGrouping(new FieldList(groupingKeys));
+			} else {
+				this.lprops = new LocalProperties();
+			}
+
+		} else {
+			this.lprops = new LocalProperties();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java
new file mode 100644
index 0000000..482951b
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/EstimateProvider.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+/**
+ * Methods for operators / connections that provide estimated about data size and
+ * characteristics.
+ */
+public interface EstimateProvider {
+	
+	/**
+	 * Gets the estimated output size from this node.
+	 * 
+	 * @return The estimated output size.
+	 */
+	long getEstimatedOutputSize();
+
+	/**
+	 * Gets the estimated number of records in the output of this node.
+	 * 
+	 * @return The estimated number of records.
+	 */
+	long getEstimatedNumRecords();
+	
+	/**
+	 * Gets the estimated number of bytes per record.
+	 * 
+	 * @return The estimated number of bytes per record.
+	 */
+	float getEstimatedAvgWidthPerOutputRecord();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java
new file mode 100644
index 0000000..118ddc8
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FilterNode.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
+import org.apache.flink.api.common.operators.base.FilterOperatorBase;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.operators.FilterDescriptor;
+import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
+
+/**
+ * The optimizer's internal representation of a <i>FlatMap</i> operator node.
+ */
+public class FilterNode extends SingleInputNode {
+	
+	private final List<OperatorDescriptorSingle> possibleProperties;
+	
+	public FilterNode(FilterOperatorBase<?, ?> operator) {
+		super(operator);
+		this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new FilterDescriptor());
+	}
+
+	@Override
+	public FilterOperatorBase<?, ?> getOperator() {
+		return (FilterOperatorBase<?, ?>) super.getOperator();
+	}
+
+	@Override
+	public String getName() {
+		return "Filter";
+	}
+
+	@Override
+	public SemanticProperties getSemanticProperties() {
+		return new SingleInputSemanticProperties.AllFieldsForwardedProperties();
+	}
+
+	@Override
+	protected List<OperatorDescriptorSingle> getPossibleProperties() {
+		return this.possibleProperties;
+	}
+
+	/**
+	 * Computes the estimates for the Filter operator. Since it applies a filter on the data we assume a cardinality
+	 * decrease. To give the system a hint at data decrease, we use a default magic number to indicate a 0.5 decrease. 
+	 */
+	@Override
+	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
+		this.estimatedNumRecords = (long) (getPredecessorNode().getEstimatedNumRecords() * 0.5);
+		this.estimatedOutputSize = (long) (getPredecessorNode().getEstimatedOutputSize() * 0.5);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java
new file mode 100644
index 0000000..f713d56
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/FlatMapNode.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.operators.FlatMapDescriptor;
+import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
+
+/**
+ * The optimizer's internal representation of a <i>FlatMap</i> operator node.
+ */
+public class FlatMapNode extends SingleInputNode {
+	
+	private final List<OperatorDescriptorSingle> possibleProperties;
+	
+	public FlatMapNode(FlatMapOperatorBase<?, ?, ?> operator) {
+		super(operator);
+		
+		this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new FlatMapDescriptor());
+	}
+
+	@Override
+	public FlatMapOperatorBase<?, ?, ?> getOperator() {
+		return (FlatMapOperatorBase<?, ?, ?>) super.getOperator();
+	}
+
+	@Override
+	public String getName() {
+		return "FlatMap";
+	}
+
+	@Override
+	protected List<OperatorDescriptorSingle> getPossibleProperties() {
+		return this.possibleProperties;
+	}
+
+	/**
+	 * Computes the estimates for the FlatMap operator. Since it un-nests, we assume a cardinality
+	 * increase. To give the system a hint at data increase, we take a default magic number of a 5 times increase. 
+	 */
+	@Override
+	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
+		this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords() * 5;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
new file mode 100644
index 0000000..564c0d3
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupCombineNode.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.operators.AllGroupCombineProperties;
+import org.apache.flink.optimizer.operators.GroupCombineProperties;
+import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * The optimizer representation of a <i>GroupCombineNode</i> operation.
+ */
+public class GroupCombineNode extends SingleInputNode {
+
+	private final List<OperatorDescriptorSingle> possibleProperties;
+
+	/**
+	 * Creates a new optimizer node for the given operator.
+	 *
+	 * @param operator The reduce operation.
+	 */
+	public GroupCombineNode(GroupCombineOperatorBase<?, ?, ?> operator) {
+		super(operator);
+
+		if (this.keys == null) {
+			// case of a key-less reducer. force a parallelism of 1
+			setDegreeOfParallelism(1);
+		}
+
+		this.possibleProperties = initPossibleProperties();
+	}
+
+	private List<OperatorDescriptorSingle> initPossibleProperties() {
+
+		// check if we can work with a grouping (simple reducer), or if we need ordering because of a group order
+		Ordering groupOrder = getOperator().getGroupOrder();
+		if (groupOrder != null && groupOrder.getNumberOfFields() == 0) {
+			groupOrder = null;
+		}
+
+		OperatorDescriptorSingle props = (this.keys == null ?
+				new AllGroupCombineProperties() :
+				new GroupCombineProperties(this.keys, groupOrder));
+
+		return Collections.singletonList(props);
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the operator represented by this optimizer node.
+	 *
+	 * @return The operator represented by this optimizer node.
+	 */
+	@Override
+	public GroupCombineOperatorBase<?, ?, ?> getOperator() {
+		return (GroupCombineOperatorBase<?, ?, ?>) super.getOperator();
+	}
+
+	@Override
+	public String getName() {
+		return "GroupCombine";
+	}
+
+	@Override
+	protected List<OperatorDescriptorSingle> getPossibleProperties() {
+		return this.possibleProperties;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Estimates
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
+		// no real estimates possible for a reducer.
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
new file mode 100644
index 0000000..77acae5
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/GroupReduceNode.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.operators.AllGroupReduceProperties;
+import org.apache.flink.optimizer.operators.AllGroupWithPartialPreGroupProperties;
+import org.apache.flink.optimizer.operators.GroupReduceProperties;
+import org.apache.flink.optimizer.operators.GroupReduceWithCombineProperties;
+import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * The optimizer representation of a <i>GroupReduce</i> operation.
+ */
+public class GroupReduceNode extends SingleInputNode {
+	
+	private final List<OperatorDescriptorSingle> possibleProperties;
+	
+	private GroupReduceNode combinerUtilityNode;
+	
+	/**
+	 * Creates a new optimizer node for the given operator.
+	 * 
+	 * @param operator The reduce operation.
+	 */
+	public GroupReduceNode(GroupReduceOperatorBase<?, ?, ?> operator) {
+		super(operator);
+		
+		if (this.keys == null) {
+			// case of a key-less reducer. force a parallelism of 1
+			setDegreeOfParallelism(1);
+		}
+		
+		this.possibleProperties = initPossibleProperties(operator.getCustomPartitioner());
+	}
+	
+	public GroupReduceNode(GroupReduceNode reducerToCopyForCombiner) {
+		super(reducerToCopyForCombiner);
+		
+		this.possibleProperties = Collections.emptyList();
+	}
+	
+	private List<OperatorDescriptorSingle> initPossibleProperties(Partitioner<?> customPartitioner) {
+		// see if an internal hint dictates the strategy to use
+		final Configuration conf = getOperator().getParameters();
+		final String localStrategy = conf.getString(Optimizer.HINT_LOCAL_STRATEGY, null);
+
+		final boolean useCombiner;
+		if (localStrategy != null) {
+			if (Optimizer.HINT_LOCAL_STRATEGY_SORT.equals(localStrategy)) {
+				useCombiner = false;
+			}
+			else if (Optimizer.HINT_LOCAL_STRATEGY_COMBINING_SORT.equals(localStrategy)) {
+				if (!isCombineable()) {
+					Optimizer.LOG.warn("Strategy hint for GroupReduce '" + getOperator().getName() +
+						"' requires combinable reduce, but user function is not marked combinable.");
+				}
+				useCombiner = true;
+			} else {
+				throw new CompilerException("Invalid local strategy hint for match contract: " + localStrategy);
+			}
+		} else {
+			useCombiner = isCombineable();
+		}
+		
+		// check if we can work with a grouping (simple reducer), or if we need ordering because of a group order
+		Ordering groupOrder = null;
+		if (getOperator() instanceof GroupReduceOperatorBase) {
+			groupOrder = ((GroupReduceOperatorBase<?, ?, ?>) getOperator()).getGroupOrder();
+			if (groupOrder != null && groupOrder.getNumberOfFields() == 0) {
+				groupOrder = null;
+			}
+		}
+		
+		OperatorDescriptorSingle props = useCombiner ?
+			(this.keys == null ? new AllGroupWithPartialPreGroupProperties() : new GroupReduceWithCombineProperties(this.keys, groupOrder, customPartitioner)) :
+			(this.keys == null ? new AllGroupReduceProperties() : new GroupReduceProperties(this.keys, groupOrder, customPartitioner));
+
+		return Collections.singletonList(props);
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the operator represented by this optimizer node.
+	 * 
+	 * @return The operator represented by this optimizer node.
+	 */
+	@Override
+	public GroupReduceOperatorBase<?, ?, ?> getOperator() {
+		return (GroupReduceOperatorBase<?, ?, ?>) super.getOperator();
+	}
+
+	/**
+	 * Checks, whether a combiner function has been given for the function encapsulated
+	 * by this reduce contract.
+	 * 
+	 * @return True, if a combiner has been given, false otherwise.
+	 */
+	public boolean isCombineable() {
+		return getOperator().isCombinable();
+	}
+
+	@Override
+	public String getName() {
+		return "GroupReduce";
+	}
+	
+	@Override
+	protected List<OperatorDescriptorSingle> getPossibleProperties() {
+		return this.possibleProperties;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Estimates
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
+		// no real estimates possible for a reducer.
+	}
+	
+	public GroupReduceNode getCombinerUtilityNode() {
+		if (this.combinerUtilityNode == null) {
+			this.combinerUtilityNode = new GroupReduceNode(this);
+			
+			// we conservatively assume the combiner returns the same data size as it consumes 
+			this.combinerUtilityNode.estimatedOutputSize = getPredecessorNode().getEstimatedOutputSize();
+			this.combinerUtilityNode.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords();
+		}
+		return this.combinerUtilityNode;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/InterestingPropertiesClearer.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/InterestingPropertiesClearer.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/InterestingPropertiesClearer.java
new file mode 100644
index 0000000..1fdae51
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/InterestingPropertiesClearer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import org.apache.flink.util.Visitor;
+
+final class InterestingPropertiesClearer implements Visitor<OptimizerNode> {
+	
+	static final InterestingPropertiesClearer INSTANCE = new InterestingPropertiesClearer();
+
+	@Override
+	public boolean preVisit(OptimizerNode visitable) {
+		if (visitable.getInterestingProperties() != null) {
+			visitable.clearInterestingProperties();
+			return true;
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public void postVisit(OptimizerNode visitable) {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/IterationNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/IterationNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/IterationNode.java
new file mode 100644
index 0000000..5d28043
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/IterationNode.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import org.apache.flink.util.Visitor;
+
+/**
+ *
+ */
+public interface IterationNode {
+	
+	void acceptForStepFunction(Visitor<OptimizerNode> visitor);
+
+}


Mime
View raw message