flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [20/53] [abbrv] flink git commit: [optimizer] Rename optimizer project to "flink-optimizer" (previously flink-compiler)
Date Fri, 20 Mar 2015 10:06:59 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/DataStatistics.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/DataStatistics.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/DataStatistics.java
new file mode 100644
index 0000000..cf6f4ec
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/DataStatistics.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+
+/**
+ * The collection of access methods that can be used to retrieve statistical information about the
+ * data processed in a job. Currently this method acts as an entry point only for obtaining cached
+ * statistics.
+ */
+public class DataStatistics {
+	
+	private final Map<String, BaseStatistics> baseStatisticsCache;
+	
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Creates a new statistics object, with an empty cache. 
+	 */
+	public DataStatistics() {
+		this.baseStatisticsCache = new HashMap<String, BaseStatistics>();
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Gets the base statistics for the input identified by the given identifier.
+	 *  
+	 * @param inputIdentifier The identifier for the input.
+	 * @return The statistics that were cached for this input.
+	 */
+	public BaseStatistics getBaseStatistics(String inputIdentifier) {
+		synchronized (this.baseStatisticsCache) {
+			return this.baseStatisticsCache.get(inputIdentifier);
+		}
+	}
+	
+	/**
+	 * Caches the given statistics. They are later retrievable under the given identifier.
+	 * 
+	 * @param statistics The statistics to cache.
+	 * @param identifier The identifier which may be later used to retrieve the statistics.
+	 */
+	public void cacheBaseStatistics(BaseStatistics statistics, String identifier) {
+		synchronized (this.baseStatisticsCache) {
+			this.baseStatisticsCache.put(identifier, statistics);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
new file mode 100644
index 0000000..2101428
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
@@ -0,0 +1,571 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.flink.optimizer.traversals.BinaryUnionReplacer;
+import org.apache.flink.optimizer.traversals.BranchesVisitor;
+import org.apache.flink.optimizer.traversals.GraphCreatingVisitor;
+import org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor;
+import org.apache.flink.optimizer.traversals.InterestingPropertyVisitor;
+import org.apache.flink.optimizer.traversals.PlanFinalizer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.optimizer.costs.CostEstimator;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
+import org.apache.flink.optimizer.dag.DataSinkNode;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dag.SinkJoiner;
+import org.apache.flink.optimizer.deadlockdetect.DeadlockPreventer;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SinkJoinerPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.postpass.OptimizerPostPass;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.util.InstantiationUtil;
+
+/**
+ * The optimizer that takes the user specified program plan and creates an optimized plan that contains
+ * exact descriptions about how the physical execution will take place. It first translates the user
+ * program into an internal optimizer representation and then chooses between different alternatives
+ * for shipping strategies and local strategies.
+ * <p>
+ * The basic principle is taken from optimizer works in systems such as Volcano/Cascades and Selinger/System-R/DB2. The
+ * optimizer walks from the sinks down, generating interesting properties, and ascends from the sources generating
+ * alternative plans, pruning against the interesting properties.
+ * <p>
+ * The optimizer also assigns the memory to the individual tasks. This is currently done in a very simple fashion: All
+ * sub-tasks that need memory (e.g. reduce or join) are given an equal share of memory.
+ */
+public class Optimizer {
+
+	// ------------------------------------------------------------------------
+	// Constants
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Compiler hint key for the input channel's shipping strategy. This String is a key to the operator's stub
+	 * parameters. The corresponding value tells the compiler which shipping strategy to use for the input channel.
+	 * If the operator has two input channels, the shipping strategy is applied to both input channels.
+	 */
+	public static final String HINT_SHIP_STRATEGY = "INPUT_SHIP_STRATEGY";
+
+	/**
+	 * Compiler hint key for the <b>first</b> input channel's shipping strategy. This String is a key to
+	 * the operator's stub parameters. The corresponding value tells the compiler which shipping strategy
+	 * to use for the <b>first</b> input channel. Only applicable to operators with two inputs.
+	 */
+	public static final String HINT_SHIP_STRATEGY_FIRST_INPUT = "INPUT_LEFT_SHIP_STRATEGY";
+
+	/**
+	 * Compiler hint key for the <b>second</b> input channel's shipping strategy. This String is a key to
+	 * the operator's stub parameters. The corresponding value tells the compiler which shipping strategy
+	 * to use for the <b>second</b> input channel. Only applicable to operators with two inputs.
+	 */
+	public static final String HINT_SHIP_STRATEGY_SECOND_INPUT = "INPUT_RIGHT_SHIP_STRATEGY";
+
+	/**
+	 * Value for the shipping strategy compiler hint that enforces a <b>Forward</b> strategy on the
+	 * input channel, i.e. no redistribution of any kind.
+	 * 
+	 * @see #HINT_SHIP_STRATEGY
+	 * @see #HINT_SHIP_STRATEGY_FIRST_INPUT
+	 * @see #HINT_SHIP_STRATEGY_SECOND_INPUT
+	 */
+	public static final String HINT_SHIP_STRATEGY_FORWARD = "SHIP_FORWARD";
+	
+	/**
+	 * Value for the shipping strategy compiler hint that enforces a random repartition strategy.
+	 * 
+	 * @see #HINT_SHIP_STRATEGY
+	 * @see #HINT_SHIP_STRATEGY_FIRST_INPUT
+	 * @see #HINT_SHIP_STRATEGY_SECOND_INPUT
+	 */
+	public static final String HINT_SHIP_STRATEGY_REPARTITION= "SHIP_REPARTITION";
+	
+	/**
+	 * Value for the shipping strategy compiler hint that enforces a hash-partition strategy.
+	 * 
+	 * @see #HINT_SHIP_STRATEGY
+	 * @see #HINT_SHIP_STRATEGY_FIRST_INPUT
+	 * @see #HINT_SHIP_STRATEGY_SECOND_INPUT
+	 */
+	public static final String HINT_SHIP_STRATEGY_REPARTITION_HASH = "SHIP_REPARTITION_HASH";
+	
+	/**
+	 * Value for the shipping strategy compiler hint that enforces a range-partition strategy.
+	 * 
+	 * @see #HINT_SHIP_STRATEGY
+	 * @see #HINT_SHIP_STRATEGY_FIRST_INPUT
+	 * @see #HINT_SHIP_STRATEGY_SECOND_INPUT
+	 */
+	public static final String HINT_SHIP_STRATEGY_REPARTITION_RANGE = "SHIP_REPARTITION_RANGE";
+
+	/**
+	 * Value for the shipping strategy compiler hint that enforces a <b>broadcast</b> strategy on the
+	 * input channel.
+	 * 
+	 * @see #HINT_SHIP_STRATEGY
+	 * @see #HINT_SHIP_STRATEGY_FIRST_INPUT
+	 * @see #HINT_SHIP_STRATEGY_SECOND_INPUT
+	 */
+	public static final String HINT_SHIP_STRATEGY_BROADCAST = "SHIP_BROADCAST";
+
+	/**
+	 * Compiler hint key for the operator's local strategy. This String is a key to the operator's stub
+	 * parameters. The corresponding value tells the compiler which local strategy to use to process the
+	 * data inside one partition.
+	 * <p>
+	 * This hint is ignored by operators that do not have a local strategy (such as <i>Map</i>), or by operators that
+	 * have no choice in their local strategy (such as <i>Cross</i>).
+	 */
+	public static final String HINT_LOCAL_STRATEGY = "LOCAL_STRATEGY";
+
+	/**
+	 * Value for the local strategy compiler hint that enforces a <b>sort based</b> local strategy.
+	 * For example, a <i>Reduce</i> operator will sort the data to group it.
+	 * 
+	 * @see #HINT_LOCAL_STRATEGY
+	 */
+	public static final String HINT_LOCAL_STRATEGY_SORT = "LOCAL_STRATEGY_SORT";
+	
+	/**
+	 * Value for the local strategy compiler hint that enforces a <b>sort based</b> local strategy.
+	 * During sorting a combine method is repeatedly applied to reduce the data volume.
+	 * For example, a <i>Reduce</i> operator will sort the data to group it.
+	 * 
+	 * @see #HINT_LOCAL_STRATEGY
+	 */
+	public static final String HINT_LOCAL_STRATEGY_COMBINING_SORT = "LOCAL_STRATEGY_COMBINING_SORT";
+	
+	/**
+	 * Value for the local strategy compiler hint that enforces a <b>sort merge based</b> local strategy on both
+	 * inputs with subsequent merging of inputs. 
+	 * For example, a <i>Match</i> or <i>CoGroup</i> operator will use a sort-merge strategy to find pairs 
+	 * of matching keys.
+	 * 
+	 * @see #HINT_LOCAL_STRATEGY
+	 */
+	public static final String HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE = "LOCAL_STRATEGY_SORT_BOTH_MERGE";
+	
+	/**
+	 * Value for the local strategy compiler hint that enforces a <b>sort merge based</b> local strategy.
+	 * The first input is sorted, the second input is assumed to be sorted. After sorting both inputs are merged.
+	 * For example, a <i>Match</i> or <i>CoGroup</i> operator will use a sort-merge strategy to find pairs 
+	 * of matching keys.
+	 * 
+	 * @see #HINT_LOCAL_STRATEGY
+	 */
+	public static final String HINT_LOCAL_STRATEGY_SORT_FIRST_MERGE = "LOCAL_STRATEGY_SORT_FIRST_MERGE";
+	
+	/**
+	 * Value for the local strategy compiler hint that enforces a <b>sort merge based</b> local strategy.
+	 * The second input is sorted, the first input is assumed to be sorted. After sorting both inputs are merged.
+	 * For example, a <i>Match</i> or <i>CoGroup</i> operator will use a sort-merge strategy to find pairs 
+	 * of matching keys.
+	 * 
+	 * @see #HINT_LOCAL_STRATEGY
+	 */
+	public static final String HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE = "LOCAL_STRATEGY_SORT_SECOND_MERGE";
+	
+	/**
+	 * Value for the local strategy compiler hint that enforces a <b>merge based</b> local strategy.
+	 * Both inputs are assumed to be sorted and are merged. 
+	 * For example, a <i>Match</i> or <i>CoGroup</i> operator will use a merge strategy to find pairs 
+	 * of matching keys.
+	 * 
+	 * @see #HINT_LOCAL_STRATEGY
+	 */
+	public static final String HINT_LOCAL_STRATEGY_MERGE = "LOCAL_STRATEGY_MERGE";
+
+	
+	/**
+	 * Value for the local strategy compiler hint that enforces a <b>hash based</b> local strategy.
+	 * For example, a <i>Match</i> operator will use a hybrid-hash-join strategy to find pairs of
+	 * matching keys. The <b>first</b> input will be used to build the hash table, the second input will be
+	 * used to probe the table.
+	 * 
+	 * @see #HINT_LOCAL_STRATEGY
+	 */
+	public static final String HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST = "LOCAL_STRATEGY_HASH_BUILD_FIRST";
+
+	/**
+	 * Value for the local strategy compiler hint that enforces a <b>hash based</b> local strategy.
+	 * For example, a <i>Match</i> operator will use a hybrid-hash-join strategy to find pairs of
+	 * matching keys. The <b>second</b> input will be used to build the hash table, the first input will be
+	 * used to probe the table.
+	 * 
+	 * @see #HINT_LOCAL_STRATEGY
+	 */
+	public static final String HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND = "LOCAL_STRATEGY_HASH_BUILD_SECOND";
+
+	/**
+	 * Value for the local strategy compiler hint that chooses the outer side of the <b>nested-loop</b> local strategy.
+	 * A <i>Cross</i> operator will process the data of the <b>first</b> input in the outer-loop of the nested loops.
+	 * Hence, the data of the first input will be is streamed though, while the data of the second input is stored on
+	 * disk
+	 * and repeatedly read.
+	 * 
+	 * @see #HINT_LOCAL_STRATEGY
+	 */
+	public static final String HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST = "LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST";
+
+	/**
+	 * Value for the local strategy compiler hint that chooses the outer side of the <b>nested-loop</b> local strategy.
+	 * A <i>Cross</i> operator will process the data of the <b>second</b> input in the outer-loop of the nested loops.
+	 * Hence, the data of the second input will be is streamed though, while the data of the first input is stored on
+	 * disk
+	 * and repeatedly read.
+	 * 
+	 * @see #HINT_LOCAL_STRATEGY
+	 */
+	public static final String HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND = "LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND";
+
+	/**
+	 * Value for the local strategy compiler hint that chooses the outer side of the <b>nested-loop</b> local strategy.
+	 * A <i>Cross</i> operator will process the data of the <b>first</b> input in the outer-loop of the nested loops.
+	 * Further more, the first input, being the outer side, will be processed in blocks, and for each block, the second
+	 * input,
+	 * being the inner side, will read repeatedly from disk.
+	 * 
+	 * @see #HINT_LOCAL_STRATEGY
+	 */
+	public static final String HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST = "LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST";
+
+	/**
+	 * Value for the local strategy compiler hint that chooses the outer side of the <b>nested-loop</b> local strategy.
+	 * A <i>Cross</i> operator will process the data of the <b>second</b> input in the outer-loop of the nested loops.
+	 * Further more, the second input, being the outer side, will be processed in blocks, and for each block, the first
+	 * input,
+	 * being the inner side, will read repeatedly from disk.
+	 * 
+	 * @see #HINT_LOCAL_STRATEGY
+	 */
+	public static final String HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND = "LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND";
+	
+	/**
+	 * The log handle that is used by the compiler to log messages.
+	 */
+	public static final Logger LOG = LoggerFactory.getLogger(Optimizer.class);
+
+	// ------------------------------------------------------------------------
+	// Members
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The statistics object used to obtain statistics, such as input sizes,
+	 * for the cost estimation process.
+	 */
+	private final DataStatistics statistics;
+
+	/**
+	 * The cost estimator used by the compiler.
+	 */
+	private final CostEstimator costEstimator;
+
+	/**
+	 * The default degree of parallelism for jobs compiled by this compiler.
+	 */
+	private int defaultDegreeOfParallelism;
+
+
+	// ------------------------------------------------------------------------
+	// Constructor & Setup
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new optimizer instance. The optimizer has no access to statistics about the
+	 * inputs and can hence not determine any properties. It will perform all optimization with
+	 * unknown sizes and hence use only the heuristic cost functions, which result in the selection
+	 * of the most robust execution strategies.
+	 */
+	public Optimizer() {
+		this(null, new DefaultCostEstimator());
+	}
+
+	/**
+	 * Creates a new optimizer instance that uses the statistics object to determine properties about the input.
+	 * Given those statistics, the optimizer can make better choices for the execution strategies.
+	 * 
+	 * @param stats
+	 *        The statistics to be used to determine the input properties.
+	 */
+	public Optimizer(DataStatistics stats) {
+		this(stats, new DefaultCostEstimator());
+	}
+
+	/**
+	 * Creates a new optimizer instance. The optimizer has no access to statistics about the
+	 * inputs and can hence not determine any properties. It will perform all optimization with
+	 * unknown sizes and hence use only the heuristic cost functions, which result in the selection
+	 * of the most robust execution strategies.
+	 *
+	 * The optimizer uses the given cost estimator to compute the costs of the individual operations.
+	 * 
+	 * @param estimator The cost estimator to use to cost the individual operations.
+	 */
+	public Optimizer(CostEstimator estimator) {
+		this(null, estimator);
+	}
+
+	/**
+	 * Creates a new optimizer instance that uses the statistics object to determine properties about the input.
+	 * Given those statistics, the optimizer can make better choices for the execution strategies.
+	 *
+	 * The optimizer uses the given cost estimator to compute the costs of the individual operations.
+	 * 
+	 * @param stats
+	 *        The statistics to be used to determine the input properties.
+	 * @param estimator
+	 *        The <tt>CostEstimator</tt> to use to cost the individual operations.
+	 */
+	public Optimizer(DataStatistics stats, CostEstimator estimator) {
+		this.statistics = stats;
+		this.costEstimator = estimator;
+
+		// determine the default parallelism
+		this.defaultDegreeOfParallelism = GlobalConfiguration.getInteger(
+				ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY,
+				ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE);
+		
+		if (defaultDegreeOfParallelism < 1) {
+			LOG.warn("Config value " + defaultDegreeOfParallelism + " for option "
+					+ ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE + " is invalid. Ignoring and using a value of 1.");
+			this.defaultDegreeOfParallelism = 1;
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//                             Getters / Setters
+	// ------------------------------------------------------------------------
+	
+	public int getDefaultDegreeOfParallelism() {
+		return defaultDegreeOfParallelism;
+	}
+	
+	public void setDefaultDegreeOfParallelism(int defaultDegreeOfParallelism) {
+		if (defaultDegreeOfParallelism > 0) {
+			this.defaultDegreeOfParallelism = defaultDegreeOfParallelism;
+		} else {
+			throw new IllegalArgumentException("Default parallelism cannot be zero or negative.");
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//                               Compilation
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Translates the given program to an OptimizedPlan, where all nodes have their local strategy assigned
+	 * and all channels have a shipping strategy assigned.
+	 *
+	 * For more details on the optimization phase, see the comments for
+	 * {@link #compile(org.apache.flink.api.common.Plan, org.apache.flink.optimizer.postpass.OptimizerPostPass)}.
+	 * 
+	 * @param program The program to be translated.
+	 * @return The optimized plan.
+	 *
+	 * @throws CompilerException
+	 *         Thrown, if the plan is invalid or the optimizer encountered an inconsistent
+	 *         situation during the compilation process.
+	 */
+	public OptimizedPlan compile(Plan program) throws CompilerException {
+		final OptimizerPostPass postPasser = getPostPassFromPlan(program);
+		return compile(program, postPasser);
+	}
+
+	/**
+	 * Translates the given program to an OptimizedPlan. The optimized plan describes for each operator
+	 * which strategy to use (such as hash join versus sort-merge join), what data exchange method to use
+	 * (local pipe forward, shuffle, broadcast), what exchange mode to use (pipelined, batch),
+	 * where to cache intermediate results, etc,
+	 *
+	 * The optimization happens in multiple phases:
+	 * <ol>
+	 *     <li>Create optimizer dag implementation of the program.
+	 *
+	 *     <tt>OptimizerNode</tt> representations of the PACTs, assign parallelism and compute size estimates.</li>
+	 * <li>Compute interesting properties and auxiliary structures.</li>
+	 * <li>Enumerate plan alternatives. This cannot be done in the same step as the interesting property computation (as
+	 * opposed to the Database approaches), because we support plans that are not trees.</li>
+	 * </ol>
+	 * 
+	 * @param program The program to be translated.
+	 * @param postPasser The function to be used for post passing the optimizer's plan and setting the
+	 *                   data type specific serialization routines.
+	 * @return The optimized plan.
+	 * 
+	 * @throws CompilerException
+	 *         Thrown, if the plan is invalid or the optimizer encountered an inconsistent
+	 *         situation during the compilation process.
+	 */
+	private OptimizedPlan compile(Plan program, OptimizerPostPass postPasser) throws CompilerException {
+		if (program == null || postPasser == null) {
+			throw new NullPointerException();
+		}
+		
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Beginning compilation of program '" + program.getJobName() + '\'');
+		}
+
+		final ExecutionMode defaultDataExchangeMode = program.getExecutionConfig().getExecutionMode();
+
+		final int defaultParallelism = program.getDefaultParallelism() > 0 ?
+			program.getDefaultParallelism() : this.defaultDegreeOfParallelism;
+
+		// log the default settings
+		LOG.debug("Using a default parallelism of {}",  defaultParallelism);
+		LOG.debug("Using default data exchange mode {}", defaultDataExchangeMode);
+
+		// the first step in the compilation is to create the optimizer plan representation
+		// this step does the following:
+		// 1) It creates an optimizer plan node for each operator
+		// 2) It connects them via channels
+		// 3) It looks for hints about local strategies and channel types and
+		// sets the types and strategies accordingly
+		// 4) It makes estimates about the data volume of the data sources and
+		// propagates those estimates through the plan
+
+		GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(defaultParallelism, defaultDataExchangeMode);
+		program.accept(graphCreator);
+
+		// if we have a plan with multiple data sinks, add logical optimizer nodes that have two data-sinks as children
+		// each until we have only a single root node. This allows to transparently deal with the nodes with
+		// multiple outputs
+		OptimizerNode rootNode;
+		if (graphCreator.getSinks().size() == 1) {
+			rootNode = graphCreator.getSinks().get(0);
+		}
+		else if (graphCreator.getSinks().size() > 1) {
+			Iterator<DataSinkNode> iter = graphCreator.getSinks().iterator();
+			rootNode = iter.next();
+
+			while (iter.hasNext()) {
+				rootNode = new SinkJoiner(rootNode, iter.next());
+			}
+		}
+		else {
+			throw new CompilerException("Bug: The optimizer plan representation has no sinks.");
+		}
+
+		// now that we have all nodes created and recorded which ones consume memory, tell the nodes their minimal
+		// guaranteed memory, for further cost estimations. we assume an equal distribution of memory among consumer tasks
+		rootNode.accept(new IdAndEstimatesVisitor(this.statistics));
+
+		// We are dealing with operator DAGs, rather than operator trees.
+		// That requires us to deviate at some points from the classical DB optimizer algorithms.
+		// This step build some auxiliary structures to help track branches and joins in the DAG
+		BranchesVisitor branchingVisitor = new BranchesVisitor();
+		rootNode.accept(branchingVisitor);
+
+		// Propagate the interesting properties top-down through the graph
+		InterestingPropertyVisitor propsVisitor = new InterestingPropertyVisitor(this.costEstimator);
+		rootNode.accept(propsVisitor);
+		
+		// perform a sanity check: the root may not have any unclosed branches
+		if (rootNode.getOpenBranches() != null && rootNode.getOpenBranches().size() > 0) {
+			throw new CompilerException("Bug: Logic for branching plans (non-tree plans) has an error, and does not " +
+					"track the re-joining of branches correctly.");
+		}
+
+		// the final step is now to generate the actual plan alternatives
+		List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator);
+
+		if (bestPlan.size() != 1) {
+			throw new CompilerException("Error in compiler: more than one best plan was created!");
+		}
+
+		// check if the best plan's root is a data sink (single sink plan)
+		// if so, directly take it. if it is a sink joiner node, get its contained sinks
+		PlanNode bestPlanRoot = bestPlan.get(0);
+		List<SinkPlanNode> bestPlanSinks = new ArrayList<SinkPlanNode>(4);
+
+		if (bestPlanRoot instanceof SinkPlanNode) {
+			bestPlanSinks.add((SinkPlanNode) bestPlanRoot);
+		} else if (bestPlanRoot instanceof SinkJoinerPlanNode) {
+			((SinkJoinerPlanNode) bestPlanRoot).getDataSinks(bestPlanSinks);
+		}
+		
+		DeadlockPreventer dp = new DeadlockPreventer();
+		dp.resolveDeadlocks(bestPlanSinks);
+
+		// finalize the plan
+		OptimizedPlan plan = new PlanFinalizer().createFinalPlan(bestPlanSinks, program.getJobName(), program);
+		
+		plan.accept(new BinaryUnionReplacer());
+		
+		// post pass the plan. this is the phase where the serialization and comparator code is set
+		postPasser.postPass(plan);
+		
+		return plan;
+	}
+
+	/**
+	 * This function performs only the first step to the compilation process - the creation of the optimizer
+	 * representation of the plan. No estimations or enumerations of alternatives are done here.
+	 * 
+	 * @param program The plan to generate the optimizer representation for.
+	 * @return The optimizer representation of the plan, as a collection of all data sinks
+	 *         from the plan can be traversed.
+	 */
+	public static List<DataSinkNode> createPreOptimizedPlan(Plan program) {
+		GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(1, null);
+		program.accept(graphCreator);
+		return graphCreator.getSinks();
+	}
+
+
+	// ------------------------------------------------------------------------
+	// Miscellaneous
+	// ------------------------------------------------------------------------
+	
+	private OptimizerPostPass getPostPassFromPlan(Plan program) {
+		final String className =  program.getPostPassClassName();
+		if (className == null) {
+			throw new CompilerException("Optimizer Post Pass class description is null");
+		}
+		try {
+			Class<? extends OptimizerPostPass> clazz = Class.forName(className).asSubclass(OptimizerPostPass.class);
+			try {
+				return InstantiationUtil.instantiate(clazz, OptimizerPostPass.class);
+			} catch (RuntimeException rtex) {
+				// unwrap the source exception
+				if (rtex.getCause() != null) {
+					throw new CompilerException("Cannot instantiate optimizer post pass: " + rtex.getMessage(), rtex.getCause());
+				} else {
+					throw rtex;
+				}
+			}
+		}
+		catch (ClassNotFoundException cnfex) {
+			throw new CompilerException("Cannot load Optimizer post-pass class '" + className + "'.", cnfex);
+		}
+		catch (ClassCastException ccex) {
+			throw new CompilerException("Class '" + className + "' is not an optimizer post-pass.", ccex);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
new file mode 100644
index 0000000..7880734
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.costs;
+
+import java.util.Iterator;
+
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.dag.EstimateProvider;
+import org.apache.flink.optimizer.dag.TempMode;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.PlanNode;
+
+/**
+ * Abstract base class for a cost estimator. Defines cost estimation methods and implements the basic work
+ * method that computes the cost of an operator by adding input shipping cost, input local cost, and
+ * driver cost.
+ */
+public abstract class CostEstimator {
+	
+	public abstract void addRandomPartitioningCost(EstimateProvider estimates, Costs costs);
+	
+	public abstract void addHashPartitioningCost(EstimateProvider estimates, Costs costs);
+	
+	public abstract void addRangePartitionCost(EstimateProvider estimates, Costs costs);
+
+	public abstract void addBroadcastCost(EstimateProvider estimates, int replicationFactor, Costs costs);
+
+	// ------------------------------------------------------------------------
+
+	public abstract void addFileInputCost(long fileSizeInBytes, Costs costs);
+	
+	public abstract void addLocalSortCost(EstimateProvider estimates, Costs costs);
+	
+	public abstract void addLocalMergeCost(EstimateProvider estimates1, EstimateProvider estimates2, Costs costs, int costWeight);
+	
+	public abstract void addHybridHashCosts(EstimateProvider buildSide, EstimateProvider probeSide, Costs costs, int costWeight);
+	
+	public abstract void addCachedHybridHashCosts(EstimateProvider buildSide, EstimateProvider probeSide, Costs costs, int costWeight);
+
+	public abstract void addStreamedNestedLoopsCosts(EstimateProvider outerSide, EstimateProvider innerSide, long bufferSize, Costs costs, int costWeight);
+
+	public abstract void addBlockNestedLoopsCosts(EstimateProvider outerSide, EstimateProvider innerSide, long blockSize, Costs costs, int costWeight);
+
+	// ------------------------------------------------------------------------
+	
+	public abstract void addArtificialDamCost(EstimateProvider estimates, long bufferSize, Costs costs);
+	
+	// ------------------------------------------------------------------------	
+
+	/**
+	 * This method computes the cost of an operator. The cost is composed of cost for input shipping,
+	 * locally processing an input, and running the operator.
+	 * 
+	 * It requires at least that all inputs are set and have a proper ship strategy set,
+	 * which is not equal to <tt>NONE</tt>.
+	 * 
+	 * @param n The node to compute the costs for.
+	 */
+	public void costOperator(PlanNode n) {
+		// initialize costs objects with no costs
+		final Costs totalCosts = new Costs();
+		final long availableMemory = n.getGuaranteedAvailableMemory();
+		
+		// add the shipping strategy costs
+		for (Channel channel : n.getInputs()) {
+			final Costs costs = new Costs();
+			
+			// Plans that apply the same strategies, but at different points
+			// are equally expensive. For example, if a partitioning can be
+			// pushed below a Map function there is often no difference in plan
+			// costs between the pushed down version and the version that partitions
+			// after the Mapper. However, in those cases, we want the expensive
+			// strategy to appear later in the plan, as data reduction often occurs
+			// by large factors, while blowup is rare and typically by smaller fractions.
+			// We achieve this by adding a penalty to small penalty to the FORWARD strategy,
+			// weighted by the current plan depth (steps to the earliest data source).
+			// that way, later FORWARDS are more expensive than earlier forwards.
+			// Note that this only applies to the heuristic costs.
+			
+			switch (channel.getShipStrategy()) {
+			case NONE:
+				throw new CompilerException(
+					"Cannot determine costs: Shipping strategy has not been set for an input.");
+			case FORWARD:
+//				costs.addHeuristicNetworkCost(channel.getMaxDepth());
+				break;
+			case PARTITION_RANDOM:
+				addRandomPartitioningCost(channel, costs);
+				break;
+			case PARTITION_HASH:
+			case PARTITION_CUSTOM:
+				addHashPartitioningCost(channel, costs);
+				break;
+			case PARTITION_RANGE:
+				addRangePartitionCost(channel, costs);
+				break;
+			case BROADCAST:
+				addBroadcastCost(channel, channel.getReplicationFactor(), costs);
+				break;
+			case PARTITION_FORCED_REBALANCE:
+				addRandomPartitioningCost(channel, costs);
+				break;
+			default:
+				throw new CompilerException("Unknown shipping strategy for input: " + channel.getShipStrategy());
+			}
+			
+			switch (channel.getLocalStrategy()) {
+			case NONE:
+				break;
+			case SORT:
+			case COMBININGSORT:
+				addLocalSortCost(channel, costs);
+				break;
+			default:
+				throw new CompilerException("Unsupported local strategy for input: " + channel.getLocalStrategy());
+			}
+			
+			if (channel.getTempMode() != null && channel.getTempMode() != TempMode.NONE) {
+				addArtificialDamCost(channel, 0, costs);
+			}
+			
+			// adjust with the cost weight factor
+			if (channel.isOnDynamicPath()) {
+				costs.multiplyWith(channel.getCostWeight());
+			}
+			
+			totalCosts.addCosts(costs);
+		} 
+		
+		Channel firstInput = null;
+		Channel secondInput = null;
+		Costs driverCosts = new Costs();
+		int costWeight = 1;
+		
+		// adjust with the cost weight factor
+		if (n.isOnDynamicPath()) {
+			costWeight = n.getCostWeight();
+		}
+		
+		// get the inputs, if we have some
+		{
+			Iterator<Channel> channels = n.getInputs().iterator();
+			if (channels.hasNext()) {
+				firstInput = channels.next();
+			}
+			if (channels.hasNext()) {
+				secondInput = channels.next();
+			}
+		}
+
+		// determine the local costs
+		switch (n.getDriverStrategy()) {
+		case NONE:
+		case UNARY_NO_OP:
+		case BINARY_NO_OP:	
+		case COLLECTOR_MAP:
+		case MAP:
+		case MAP_PARTITION:
+		case FLAT_MAP:
+			
+		case ALL_GROUP_REDUCE:
+		case ALL_REDUCE:
+			// this operations does not do any actual grouping, since every element is in the same single group
+			
+		case CO_GROUP:
+		case SORTED_GROUP_REDUCE:
+		case SORTED_REDUCE:
+			// grouping or co-grouping over sorted streams for free
+			
+		case SORTED_GROUP_COMBINE:
+			// partial grouping is always local and main memory resident. we should add a relative cpu cost at some point
+
+			// partial grouping is always local and main memory resident. we should add a relative cpu cost at some point
+		case ALL_GROUP_COMBINE:
+			
+		case UNION:
+			// pipelined local union is for free
+			
+			break;
+		case MERGE:
+			addLocalMergeCost(firstInput, secondInput, driverCosts, costWeight);
+			break;
+		case HYBRIDHASH_BUILD_FIRST:
+			addHybridHashCosts(firstInput, secondInput, driverCosts, costWeight);
+			break;
+		case HYBRIDHASH_BUILD_SECOND:
+			addHybridHashCosts(secondInput, firstInput, driverCosts, costWeight);
+			break;
+		case HYBRIDHASH_BUILD_FIRST_CACHED:
+			addCachedHybridHashCosts(firstInput, secondInput, driverCosts, costWeight);
+			break;
+		case HYBRIDHASH_BUILD_SECOND_CACHED:
+			addCachedHybridHashCosts(secondInput, firstInput, driverCosts, costWeight);
+			break;
+		case NESTEDLOOP_BLOCKED_OUTER_FIRST:
+			addBlockNestedLoopsCosts(firstInput, secondInput, availableMemory, driverCosts, costWeight);
+			break;
+		case NESTEDLOOP_BLOCKED_OUTER_SECOND:
+			addBlockNestedLoopsCosts(secondInput, firstInput, availableMemory, driverCosts, costWeight);
+			break;
+		case NESTEDLOOP_STREAMED_OUTER_FIRST:
+			addStreamedNestedLoopsCosts(firstInput, secondInput, availableMemory, driverCosts, costWeight);
+			break;
+		case NESTEDLOOP_STREAMED_OUTER_SECOND:
+			addStreamedNestedLoopsCosts(secondInput, firstInput, availableMemory, driverCosts, costWeight);
+			break;
+		default:
+			throw new CompilerException("Unknown local strategy: " + n.getDriverStrategy().name());
+		}
+		
+		totalCosts.addCosts(driverCosts);
+		n.setCosts(totalCosts);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/Costs.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/Costs.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/Costs.java
new file mode 100644
index 0000000..7c854bf
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/Costs.java
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.costs;
+
+/**
+ * Simple class to represent the costs of an operation. The costs are currently tracking, network, I/O and CPU costs.
+ * 
+ * Costs are composed of two parts of cost contributors:
+ * <ol>
+ *   <li>Quantifiable costs. Those costs are used when estimates are available and track a quantifiable
+ *       measure, such as the number of bytes for network or I/O</li>
+ *   <li>Heuristic costs. Those costs are used when no estimates are available. They can be used to track that
+ *       an operator used a special operation which is heuristically considered more expensive than another
+ *       operation.</li>
+ * </ol>
+ * <p>
+ * The quantifiable costs may frequently be unknown, which is represented by a {@code -1} as a value for the unknown
+ * components of the cost. In that case, all operations' costs are unknown and hence it is not decidable which
+ * operation to favor during pruning. In that case, the heuristic costs should contain a value to make sure that
+ * operators with different strategies are comparable, even in the absence of estimates. The heuristic
+ * costs are hence the system's mechanism of realizing pruning heuristics that favor some operations over others.
+ */
+public class Costs implements Comparable<Costs>, Cloneable {
+
+	public static final double UNKNOWN = -1;
+	
+	private double networkCost;				// network cost, in transferred bytes
+
+	private double diskCost;		// bytes to be written and read, in bytes
+	
+	private double cpuCost;					// CPU costs
+	
+	private double heuristicNetworkCost;
+	
+	private double heuristicDiskCost;
+	
+	private double heuristicCpuCost;
+	
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Default constructor. Initializes all costs to 0;
+	 */
+	public Costs() {
+	}
+
+	/**
+	 * Creates a new costs object using the given values for the network and storage cost.
+	 * 
+	 * @param networkCost The network cost, in bytes to be transferred.
+	 * @param diskCost The cost for disk, in bytes to be written and read.
+	 */
+	public Costs(double networkCost, double diskCost) {
+		setNetworkCost(networkCost);
+		setDiskCost(diskCost);
+	}
+	
+	/**
+	 * Creates a new costs object using the given values for the network and storage cost.
+	 * 
+	 * @param networkCost The network cost, in bytes to be transferred.
+	 * @param diskCost The cost for disk, in bytes to be written and read.
+	 * @param cpuCost The cost for CPU operations.
+	 */
+	public Costs(double networkCost, double diskCost, double cpuCost) {
+		setNetworkCost(networkCost);
+		setDiskCost(diskCost);
+		setCpuCost(cpuCost);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Gets the network cost.
+	 * 
+	 * @return The network cost, in bytes to be transferred.
+	 */
+	public double getNetworkCost() {
+		return networkCost;
+	}
+
+	/**
+	 * Sets the network cost for this Costs object.
+	 * 
+	 * @param bytes
+	 *        The network cost to set, in bytes to be transferred.
+	 */
+	public void setNetworkCost(double bytes) {
+		if (bytes == UNKNOWN || bytes >= 0) {
+			this.networkCost = bytes;
+		} else {
+			throw new IllegalArgumentException();
+		}
+	}
+	
+	/**
+	 * Adds the costs for network to the current network costs
+	 * for this Costs object.
+	 * 
+	 * @param bytes The network cost to add, in bytes to be transferred.
+	 */
+	public void addNetworkCost(double bytes) {
+		this.networkCost = (this.networkCost < 0 || bytes < 0) ? UNKNOWN : this.networkCost + bytes;
+	}
+
+	/**
+	 * Gets the costs for disk.
+	 * 
+	 * @return The disk cost, in bytes to be written and read.
+	 */
+	public double getDiskCost() {
+		return diskCost;
+	}
+
+	/**
+	 * Sets the costs for disk for this Costs object.
+	 * 
+	 * @param bytes The disk cost to set, in bytes to be written and read.
+	 */
+	public void setDiskCost(double bytes) {
+		if (bytes == UNKNOWN || bytes >= 0) {
+			this.diskCost = bytes;
+		} else {
+			throw new IllegalArgumentException();
+		}
+	}
+	
+	/**
+	 * Adds the costs for disk to the current disk costs
+	 * for this Costs object.
+	 * 
+	 * @param bytes The disk cost to add, in bytes to be written and read.
+	 */
+	public void addDiskCost(double bytes) {
+		this.diskCost = 
+			(this.diskCost < 0 || bytes < 0) ? UNKNOWN : this.diskCost + bytes;
+	}
+	
+	/**
+	 * Gets the cost for the CPU.
+	 * 
+	 * @return The CPU Cost.
+	 */
+	public double getCpuCost() {
+		return this.cpuCost;
+	}
+
+	/**
+	 * Sets the cost for the CPU.
+	 * 
+	 * @param cost The CPU Cost.
+	 */
+	public void setCpuCost(double cost) {
+		if (cost == UNKNOWN || cost >= 0) {
+			this.cpuCost = cost;
+		} else {
+			throw new IllegalArgumentException();
+		}
+	}
+	
+	/**
+	 * Adds the given CPU cost to the current CPU cost for this Costs object.
+	 * 
+	 * @param cost The CPU cost to add.
+	 */
+	public void addCpuCost(double cost) {
+		this.cpuCost = 
+			(this.cpuCost < 0 || cost < 0) ? UNKNOWN : this.cpuCost + cost;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Gets the heuristic network cost.
+	 * 
+	 * @return The heuristic network cost, in bytes to be transferred.
+	 */
+	public double getHeuristicNetworkCost() {
+		return this.heuristicNetworkCost;
+	}
+
+	/**
+	 * Sets the heuristic network cost for this Costs object.
+	 * 
+	 * @param cost The heuristic network cost to set, in bytes to be transferred.
+	 */
+	public void setHeuristicNetworkCost(double cost) {
+		if (cost <= 0) {
+			throw new IllegalArgumentException("Heuristic costs must be positive.");
+		}
+		this.heuristicNetworkCost = cost;
+	}
+	
+	/**
+	 * Adds the heuristic costs for network to the current heuristic network costs
+	 * for this Costs object.
+	 * 
+	 * @param cost The heuristic network cost to add.
+	 */
+	public void addHeuristicNetworkCost(double cost) {
+		if (cost <= 0) {
+			throw new IllegalArgumentException("Heuristic costs must be positive.");
+		}
+		this.heuristicNetworkCost += cost;
+		// check for overflow
+		if (this.heuristicNetworkCost < 0) {
+			this.heuristicNetworkCost = Double.MAX_VALUE;
+		}
+	}
+
+	/**
+	 * Gets the heuristic costs for disk.
+	 * 
+	 * @return The heuristic disk cost.
+	 */
+	public double getHeuristicDiskCost() {
+		return this.heuristicDiskCost;
+	}
+
+	/**
+	 * Sets the heuristic costs for disk for this Costs object.
+	 * 
+	 * @param cost The heuristic disk cost to set.
+	 */
+	public void setHeuristicDiskCost(double cost) {
+		if (cost <= 0) {
+			throw new IllegalArgumentException("Heuristic costs must be positive.");
+		}
+		this.heuristicDiskCost = cost;
+	}
+	
+	/**
+	 * Adds the heuristic costs for disk to the current heuristic disk costs
+	 * for this Costs object.
+	 * 
+	 * @param cost The heuristic disk cost to add.
+	 */
+	public void addHeuristicDiskCost(double cost) {
+		if (cost <= 0) {
+			throw new IllegalArgumentException("Heuristic costs must be positive.");
+		}
+		this.heuristicDiskCost += cost;
+		// check for overflow
+		if (this.heuristicDiskCost < 0) {
+			this.heuristicDiskCost = Double.MAX_VALUE;
+		}
+	}
+	
+	/**
+	 * Gets the heuristic cost for the CPU.
+	 * 
+	 * @return The heuristic CPU Cost.
+	 */
+	public double getHeuristicCpuCost() {
+		return this.heuristicCpuCost;
+	}
+
+	/**
+	 * Sets the heuristic cost for the CPU.
+	 * 
+	 * @param cost The heuristic CPU Cost.
+	 */
+	public void setHeuristicCpuCost(double cost) {
+		if (cost <= 0) {
+			throw new IllegalArgumentException("Heuristic costs must be positive.");
+		}
+		this.heuristicCpuCost = cost;
+	}
+	
+	/**
+	 * Adds the given heuristic CPU cost to the current heuristic CPU cost for this Costs object.
+	 * 
+	 * @param cost The heuristic CPU cost to add.
+	 */
+	public void addHeuristicCpuCost(double cost) {
+		if (cost <= 0) {
+			throw new IllegalArgumentException("Heuristic costs must be positive.");
+		}
+		this.heuristicCpuCost += cost;
+		// check for overflow
+		if (this.heuristicCpuCost < 0) {
+			this.heuristicCpuCost = Double.MAX_VALUE;
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Adds the given costs to these costs. If for one of the different cost components (network, disk),
+	 * the costs are unknown, the resulting costs will be unknown.
+	 * 
+	 * @param other The costs to add.
+	 */
+	public void addCosts(Costs other) {
+		// ---------- quantifiable costs ----------
+		if (this.networkCost == UNKNOWN || other.networkCost == UNKNOWN) {
+			this.networkCost = UNKNOWN;
+		} else {
+			this.networkCost += other.networkCost;
+		}
+		
+		if (this.diskCost == UNKNOWN || other.diskCost == UNKNOWN) {
+			this.diskCost = UNKNOWN;
+		} else {
+			this.diskCost += other.diskCost;
+		}
+		
+		if (this.cpuCost == UNKNOWN || other.cpuCost == UNKNOWN) {
+			this.cpuCost = UNKNOWN;
+		} else {
+			this.cpuCost += other.cpuCost;
+		}
+		
+		// ---------- heuristic costs ----------
+		
+		this.heuristicNetworkCost += other.heuristicNetworkCost;
+		this.heuristicDiskCost += other.heuristicDiskCost;
+		this.heuristicCpuCost += other.heuristicCpuCost;
+	}
+	
+	/**
+	 * Subtracts the given costs from these costs. If the given costs are unknown, then these costs are remain unchanged.
+	 *  
+	 * @param other The costs to subtract.
+	 */
+	public void subtractCosts(Costs other) {
+		if (this.networkCost != UNKNOWN && other.networkCost != UNKNOWN) {
+			this.networkCost -= other.networkCost;
+			if (this.networkCost < 0) {
+				throw new IllegalArgumentException("Cannot subtract more cost then there is.");
+			}
+		}
+		if (this.diskCost != UNKNOWN && other.diskCost != UNKNOWN) {
+			this.diskCost -= other.diskCost;
+			if (this.diskCost < 0) {
+				throw new IllegalArgumentException("Cannot subtract more cost then there is.");
+			}
+		}
+		if (this.cpuCost != UNKNOWN && other.cpuCost != UNKNOWN) {
+			this.cpuCost -= other.cpuCost;
+			if (this.cpuCost < 0) {
+				throw new IllegalArgumentException("Cannot subtract more cost then there is.");
+			}
+		}
+		
+		// ---------- relative costs ----------
+		
+		this.heuristicNetworkCost -= other.heuristicNetworkCost;
+		if (this.heuristicNetworkCost < 0) {
+			throw new IllegalArgumentException("Cannot subtract more cost then there is.");
+		}
+		this.heuristicDiskCost -= other.heuristicDiskCost;
+		if (this.heuristicDiskCost < 0) {
+			throw new IllegalArgumentException("Cannot subtract more cost then there is.");
+		}
+		this.heuristicCpuCost -= other.heuristicCpuCost;
+		if (this.heuristicCpuCost < 0) {
+			throw new IllegalArgumentException("Cannot subtract more cost then there is.");
+		}
+	}
+	
+	public void multiplyWith(int factor) {
+		this.networkCost = this.networkCost < 0 ? -1 : this.networkCost * factor;
+		this.diskCost = this.diskCost < 0 ? -1 : this.diskCost * factor;
+		this.cpuCost = this.cpuCost < 0 ? -1 : this.cpuCost * factor;
+		this.heuristicNetworkCost = this.heuristicNetworkCost < 0 ? -1 : this.heuristicNetworkCost * factor;
+		this.heuristicDiskCost = this.heuristicDiskCost < 0 ? -1 : this.heuristicDiskCost * factor;
+		this.heuristicCpuCost = this.heuristicCpuCost < 0 ? -1 : this.heuristicCpuCost * factor;
+	}
+
+	public void divideBy(int factor) {
+		this.networkCost = this.networkCost < 0 ? -1 : this.networkCost / factor;
+		this.diskCost = this.diskCost < 0 ? -1 : this.diskCost / factor;
+		this.cpuCost = this.cpuCost < 0 ? -1 : this.cpuCost / factor;
+		this.heuristicNetworkCost = this.heuristicNetworkCost < 0 ? -1 : this.heuristicNetworkCost / factor;
+		this.heuristicDiskCost = this.heuristicDiskCost < 0 ? -1 : this.heuristicDiskCost / factor;
+		this.heuristicCpuCost = this.heuristicCpuCost < 0 ? -1 : this.heuristicCpuCost / factor;
+	}
+
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * The order of comparison is: network first, then disk, then CPU. The comparison here happens each time
+	 * primarily after the heuristic costs, then after the quantifiable costs.
+	 * 
+	 * @see java.lang.Comparable#compareTo(java.lang.Object)
+	 */
+	@Override
+	public int compareTo(Costs o) {
+		// check the network cost. if we have actual costs on both, use them, otherwise use the heuristic costs.
+		if (this.networkCost != UNKNOWN && o.networkCost != UNKNOWN) {
+			if (this.networkCost != o.networkCost) {
+				return this.networkCost < o.networkCost ? -1 : 1;
+			}
+		} else if (this.heuristicNetworkCost < o.heuristicNetworkCost) {
+			return -1;
+		} else if (this.heuristicNetworkCost > o.heuristicNetworkCost) {
+			return 1;
+		}
+		
+		// next, check the disk cost. again, if we have actual costs on both, use them, otherwise use the heuristic costs.
+		if (this.diskCost != UNKNOWN && o.diskCost != UNKNOWN) {
+			if (this.diskCost != o.diskCost) {
+				return this.diskCost < o.diskCost ? -1 : 1;
+			}
+		} else if (this.heuristicDiskCost < o.heuristicDiskCost) {
+			return -1;
+		} else if (this.heuristicDiskCost > o.heuristicDiskCost) {
+			return 1;
+		}
+		
+		// next, check the disk cost. again, if we have actual costs on both, use them, otherwise use the heuristic costs.
+		if (this.cpuCost != UNKNOWN && o.cpuCost != UNKNOWN) {
+			return this.cpuCost < o.cpuCost ? -1 : this.cpuCost > o.cpuCost ? 1 : 0;
+		} else if (this.heuristicCpuCost < o.heuristicCpuCost) {
+			return -1;
+		} else if (this.heuristicCpuCost > o.heuristicCpuCost) {
+			return 1;
+		} else {
+			return 0;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		final int prime = 31;
+		int result = 1;
+		long cpuCostBits = Double.doubleToLongBits(cpuCost);
+		long heuristicCpuCostBits = Double.doubleToLongBits(heuristicCpuCost);
+		long heuristicNetworkCostBits = Double.doubleToLongBits(heuristicNetworkCost);
+		long heuristicDiskCostBits = Double.doubleToLongBits(heuristicDiskCost);
+		long networkCostBits = Double.doubleToLongBits(networkCost);
+		long diskCostBits = Double.doubleToLongBits(diskCost);
+
+		result = prime * result + (int) (cpuCostBits ^ (cpuCostBits >>> 32));
+		result = prime * result + (int) (heuristicCpuCostBits ^ (heuristicCpuCostBits >>> 32));
+		result = prime * result + (int) (heuristicNetworkCostBits ^ (heuristicNetworkCostBits >>> 32));
+		result = prime * result + (int) (heuristicDiskCostBits ^ (heuristicDiskCostBits >>> 32));
+		result = prime * result + (int) (networkCostBits ^ (networkCostBits >>> 32));
+		result = prime * result + (int) (diskCostBits ^ (diskCostBits >>> 32));
+		return result;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj.getClass() == getClass()) {
+			final Costs other = (Costs) obj;
+			return this.networkCost == other.networkCost &
+					this.diskCost == other.diskCost &
+					this.cpuCost == other.cpuCost &
+					this.heuristicNetworkCost == other.heuristicNetworkCost &
+					this.heuristicDiskCost == other.heuristicDiskCost &
+					this.heuristicCpuCost == other.heuristicCpuCost;
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public String toString() {
+		return "Costs [networkCost=" + networkCost + ", diskCost=" + diskCost + 
+				", cpuCost=" + cpuCost + ", heuristicNetworkCost=" + heuristicNetworkCost + 
+				", heuristicDiskCost=" + heuristicDiskCost + ", heuristicCpuCost=" + heuristicCpuCost + "]";
+	}
+
+	@Override
+	public Costs clone() {
+		try {
+			return (Costs) super.clone();
+		} catch (CloneNotSupportedException e) {
+			throw new RuntimeException(e);	// should never happen
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/DefaultCostEstimator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/DefaultCostEstimator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/DefaultCostEstimator.java
new file mode 100644
index 0000000..543f330
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/DefaultCostEstimator.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.costs;
+
+import org.apache.flink.optimizer.dag.EstimateProvider;
+
+/**
+ * A default cost estimator that has access to basic size and cardinality estimates.
+ * <p>
+ * This estimator works with actual estimates (as far as they are available) and falls back to setting
+ * relative costs, if no estimates are available. That way, the estimator makes sure that plans with
+ * different strategies are costed differently, also in the absence of estimates. The different relative
+ * costs in the absence of estimates represent this estimator's heuristic guidance towards certain strategies.
+ * <p>
+ * For robustness reasons, we always assume that the whole data is shipped during a repartition step. We deviate from
+ * the typical estimate of <code>(n - 1) / n</code> (with <i>n</i> being the number of nodes), because for a parallelism
+ * of 1, that would yield a shipping of zero bytes. While this is usually correct, the runtime scheduling may still
+ * choose to move tasks to different nodes, so that we do not know that no data is shipped.
+ */
+public class DefaultCostEstimator extends CostEstimator {
+	
+	/**
+	 * The case of the estimation for all relative costs. We heuristically pick a very large data volume, which
+	 * will favor strategies that are less expensive on large data volumes. This is robust and 
+	 */
+	private static final long HEURISTIC_COST_BASE = 1000000000L;
+	
+	// The numbers for the CPU effort are rather magic at the moment and should be seen rather ordinal
+	
+	private static final float MATERIALIZATION_CPU_FACTOR = 1;
+	
+	private static final float HASHING_CPU_FACTOR = 4;
+	
+	private static final float SORTING_CPU_FACTOR = 9;
+	
+	
+	// --------------------------------------------------------------------------------------------
+	// Shipping Strategy Cost
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public void addRandomPartitioningCost(EstimateProvider estimates, Costs costs) {
+		// conservative estimate: we need ship the whole data over the network to establish the
+		// partitioning. no disk costs.
+		final long estOutShipSize = estimates.getEstimatedOutputSize();
+		if (estOutShipSize <= 0) {
+			costs.setNetworkCost(Costs.UNKNOWN);
+		} else {
+			costs.addNetworkCost(estOutShipSize);
+		}
+		costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE);
+	}
+	
+	@Override
+	public void addHashPartitioningCost(EstimateProvider estimates, Costs costs) {
+		// conservative estimate: we need ship the whole data over the network to establish the
+		// partitioning. no disk costs.
+		final long estOutShipSize = estimates.getEstimatedOutputSize();
+		if (estOutShipSize <= 0) {
+			costs.setNetworkCost(Costs.UNKNOWN);
+		} else {
+			costs.addNetworkCost(estOutShipSize);
+		}
+		costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE);
+	}
+	
+	@Override
+	public void addRangePartitionCost(EstimateProvider estimates, Costs costs) {
+		final long dataSize = estimates.getEstimatedOutputSize();
+		if (dataSize > 0) {
+			// Assume sampling of 10% of the data and spilling it to disk
+			final long sampled = (long) (dataSize * 0.1f);
+			// set shipping costs
+			costs.addNetworkCost(dataSize + sampled);
+		} else {
+			costs.setNetworkCost(Costs.UNKNOWN);
+		}
+		
+		// no costs known. use the same assumption as above on the heuristic costs
+		final long sampled = (long) (HEURISTIC_COST_BASE * 0.1f);
+		costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE + sampled);
+		costs.addHeuristicDiskCost(2 * sampled);
+	}
+
+	@Override
+	public void addBroadcastCost(EstimateProvider estimates, int replicationFactor, Costs costs) {
+		// if our replication factor is negative, we cannot calculate broadcast costs
+		if (replicationFactor <= 0) {
+			throw new IllegalArgumentException("The replication factor of must be larger than zero.");
+		}
+
+		if (replicationFactor > 0) {
+			// assumption: we need ship the whole data over the network to each node.
+			final long estOutShipSize = estimates.getEstimatedOutputSize();
+			if (estOutShipSize <= 0) {
+				costs.setNetworkCost(Costs.UNKNOWN);
+			} else {
+				costs.addNetworkCost(replicationFactor * estOutShipSize);
+			}
+			costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * 10 * replicationFactor);
+		} else {
+			costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * 1000);
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	// Local Strategy Cost
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public void addFileInputCost(long fileSizeInBytes, Costs costs) {
+		if (fileSizeInBytes >= 0) {
+			costs.addDiskCost(fileSizeInBytes);
+		} else {
+			costs.setDiskCost(Costs.UNKNOWN);
+		}
+		costs.addHeuristicDiskCost(HEURISTIC_COST_BASE);
+	}
+	
+	@Override
+	public void addLocalSortCost(EstimateProvider estimates, Costs costs) {
+		final long s = estimates.getEstimatedOutputSize();
+		// we assume a two phase merge sort, so all in all 2 I/O operations per block
+		if (s <= 0) {
+			costs.setDiskCost(Costs.UNKNOWN);
+			costs.setCpuCost(Costs.UNKNOWN);
+		} else {
+			costs.addDiskCost(2 * s);
+			costs.addCpuCost((long) (s * SORTING_CPU_FACTOR));
+		}
+		costs.addHeuristicDiskCost(2 * HEURISTIC_COST_BASE);
+		costs.addHeuristicCpuCost((long) (HEURISTIC_COST_BASE * SORTING_CPU_FACTOR));
+	}
+
+	@Override
+	public void addLocalMergeCost(EstimateProvider input1, EstimateProvider input2, Costs costs, int costWeight) {
+		// costs nothing. the very rarely incurred cost for a spilling block nested loops join in the
+		// presence of massively re-occurring duplicate keys is ignored, because cannot be assessed
+	}
+
+	@Override
+	public void addHybridHashCosts(EstimateProvider buildSideInput, EstimateProvider probeSideInput, Costs costs, int costWeight) {
+		long bs = buildSideInput.getEstimatedOutputSize();
+		long ps = probeSideInput.getEstimatedOutputSize();
+		
+		if (bs > 0 && ps > 0) {
+			long overall = 2*bs + ps;
+			costs.addDiskCost(overall);
+			costs.addCpuCost((long) (overall * HASHING_CPU_FACTOR));
+		} else {
+			costs.setDiskCost(Costs.UNKNOWN);
+			costs.setCpuCost(Costs.UNKNOWN);
+		}
+		costs.addHeuristicDiskCost(2 * HEURISTIC_COST_BASE);
+		costs.addHeuristicCpuCost((long) (2 * HEURISTIC_COST_BASE * HASHING_CPU_FACTOR));
+		
+		// cost weight applies to everything
+		costs.multiplyWith(costWeight);
+	}
+	
+	/**
+	 * Calculates the costs for the cached variant of the hybrid hash join.
+	 * We are assuming by default that half of the cached hash table fit into memory.
+	 */
+	@Override
+	public void addCachedHybridHashCosts(EstimateProvider buildSideInput, EstimateProvider probeSideInput, Costs costs, int costWeight) {
+		if (costWeight < 1) {
+			throw new IllegalArgumentException("The cost weight must be at least one.");
+		}
+		
+		long bs = buildSideInput.getEstimatedOutputSize();
+		long ps = probeSideInput.getEstimatedOutputSize();
+		
+		if (bs > 0 && ps > 0) {
+			long overall = 2*bs + costWeight*ps;
+			costs.addDiskCost(overall);
+			costs.addCpuCost((long) (overall * HASHING_CPU_FACTOR));
+		} else {
+			costs.setDiskCost(Costs.UNKNOWN);
+			costs.setCpuCost(Costs.UNKNOWN);
+		}
+		
+		// one time the build side plus cost-weight time the probe side
+		costs.addHeuristicDiskCost((1 + costWeight) * HEURISTIC_COST_BASE);
+		costs.addHeuristicCpuCost((long) ((1 + costWeight) * HEURISTIC_COST_BASE * HASHING_CPU_FACTOR));
+	}
+
+	@Override
+	public void addStreamedNestedLoopsCosts(EstimateProvider outerSide, EstimateProvider innerSide, long bufferSize, Costs costs, int costWeight) {
+		long is = innerSide.getEstimatedOutputSize(); 
+		long oc = outerSide.getEstimatedNumRecords();
+		
+		if (is > 0 && oc >= 0) {
+			// costs, if the inner side cannot be cached
+			if (is > bufferSize) {
+				costs.addDiskCost(oc * is);
+			}
+			costs.addCpuCost((long) (oc * is * MATERIALIZATION_CPU_FACTOR));
+		} else {
+			costs.setDiskCost(Costs.UNKNOWN);
+			costs.setCpuCost(Costs.UNKNOWN);
+		}
+		
+		// hack: assume 100k loops (should be expensive enough)
+		costs.addHeuristicDiskCost(HEURISTIC_COST_BASE * 100000);
+		costs.addHeuristicCpuCost((long) (HEURISTIC_COST_BASE * 100000 * MATERIALIZATION_CPU_FACTOR));
+		costs.multiplyWith(costWeight);
+	}
+
+	@Override
+	public void addBlockNestedLoopsCosts(EstimateProvider outerSide, EstimateProvider innerSide, long blockSize, Costs costs, int costWeight) {
+		long is = innerSide.getEstimatedOutputSize(); 
+		long os = outerSide.getEstimatedOutputSize();
+		
+		if (is > 0 && os > 0) {
+			long loops = Math.max(os / blockSize, 1);
+			costs.addDiskCost(loops * is);
+			costs.addCpuCost((long) (loops * is * MATERIALIZATION_CPU_FACTOR));
+		} else {
+			costs.setDiskCost(Costs.UNKNOWN);
+			costs.setCpuCost(Costs.UNKNOWN);
+		}
+		
+		// hack: assume 1k loops (much cheaper than the streamed variant!)
+		costs.addHeuristicDiskCost(HEURISTIC_COST_BASE * 1000);
+		costs.addHeuristicCpuCost((long) (HEURISTIC_COST_BASE * 1000 * MATERIALIZATION_CPU_FACTOR));
+		costs.multiplyWith(costWeight);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Damming Cost
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public void addArtificialDamCost(EstimateProvider estimates, long bufferSize, Costs costs) {
+		final long s = estimates.getEstimatedOutputSize();
+		// we assume spilling and re-reading
+		if (s <= 0) {
+			costs.setDiskCost(Costs.UNKNOWN);
+			costs.setCpuCost(Costs.UNKNOWN);
+		} else {
+			costs.addDiskCost(2 * s);
+			costs.setCpuCost((long) (s * MATERIALIZATION_CPU_FACTOR));
+		}
+		costs.addHeuristicDiskCost(2 * HEURISTIC_COST_BASE);
+		costs.addHeuristicCpuCost((long) (HEURISTIC_COST_BASE * MATERIALIZATION_CPU_FACTOR));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/AbstractPartialSolutionNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/AbstractPartialSolutionNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/AbstractPartialSolutionNode.java
new file mode 100644
index 0000000..d199ae7
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/AbstractPartialSolutionNode.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.costs.CostEstimator;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.util.Visitor;
+
+/**
+ * The optimizer's internal representation of the partial solution that is input to a bulk iteration.
+ */
+public abstract class AbstractPartialSolutionNode extends OptimizerNode {
+	
+	protected AbstractPartialSolutionNode(Operator<?> contract) {
+		super(contract);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	protected void copyEstimates(OptimizerNode node) {
+		this.estimatedNumRecords = node.estimatedNumRecords;
+		this.estimatedOutputSize = node.estimatedOutputSize;
+	}
+	
+	public abstract IterationNode getIterationNode();
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public boolean isOnDynamicPath() {
+		return true;
+	}
+	
+	public void identifyDynamicPath(int costWeight) {
+		this.onDynamicPath = true;
+		this.costWeight = costWeight;
+	}
+
+	@Override
+	public List<DagConnection> getIncomingConnections() {
+		return Collections.emptyList();
+	}
+
+	@Override
+	public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode dataExchangeMode) {}
+
+	@Override
+	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
+		// we do nothing here, because the estimates can only be copied from the iteration input
+	}
+	
+	@Override
+	public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
+		// no children, so nothing to compute
+	}
+
+	@Override
+	public List<PlanNode> getAlternativePlans(CostEstimator estimator) {
+		if (this.cachedPlans != null) {
+			return this.cachedPlans;
+		} else {
+			throw new IllegalStateException();
+		}
+	}
+
+	@Override
+	public SemanticProperties getSemanticProperties() {
+		return new EmptySemanticProperties();
+	}
+	
+	@Override
+	protected void readStubAnnotations() {}
+
+	@Override
+	public void accept(Visitor<OptimizerNode> visitor) {
+		if (visitor.preVisit(this)) {
+			visitor.postVisit(this);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
new file mode 100644
index 0000000..068799e
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.dag;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.Union;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.costs.CostEstimator;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.InterestingProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.operators.BinaryUnionOpDescriptor;
+import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.NamedChannel;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+
+/**
+ * The Optimizer representation of a binary <i>Union</i>.
+ */
+public class BinaryUnionNode extends TwoInputNode {
+	
+	private Set<RequestedGlobalProperties> channelProps;
+
+	public BinaryUnionNode(Union<?> union){
+		super(union);
+	}
+
+	@Override
+	public String getName() {
+		return "Union";
+	}
+
+	@Override
+	protected List<OperatorDescriptorDual> getPossibleProperties() {
+		return Collections.emptyList();
+	}
+	
+	@Override
+	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
+		long card1 = getFirstPredecessorNode().getEstimatedNumRecords();
+		long card2 = getSecondPredecessorNode().getEstimatedNumRecords();
+		this.estimatedNumRecords = (card1 < 0 || card2 < 0) ? -1 : card1 + card2;
+		
+		long size1 = getFirstPredecessorNode().getEstimatedOutputSize();
+		long size2 = getSecondPredecessorNode().getEstimatedOutputSize();
+		this.estimatedOutputSize = (size1 < 0 || size2 < 0) ? -1 : size1 + size2;
+	}
+	
+	@Override
+	public void computeUnionOfInterestingPropertiesFromSuccessors() {
+		super.computeUnionOfInterestingPropertiesFromSuccessors();
+		// clear all local properties, as they are destroyed anyways
+		getInterestingProperties().getLocalProperties().clear();
+	}
+	
+	@Override
+	public void computeInterestingPropertiesForInputs(CostEstimator estimator) { 
+		final InterestingProperties props = getInterestingProperties();
+		
+		// if no other properties exist, add the pruned trivials back
+		if (props.getGlobalProperties().isEmpty()) {
+			props.addGlobalProperties(new RequestedGlobalProperties());
+		}
+		props.addLocalProperties(new RequestedLocalProperties());
+		this.input1.setInterestingProperties(props.clone());
+		this.input2.setInterestingProperties(props.clone());
+		
+		this.channelProps = props.getGlobalProperties();
+	}
+	
+	@Override
+	public List<PlanNode> getAlternativePlans(CostEstimator estimator) {
+		// check if we have a cached version
+		if (this.cachedPlans != null) {
+			return this.cachedPlans;
+		}
+
+		// step down to all producer nodes and calculate alternative plans
+		final List<? extends PlanNode> subPlans1 = getFirstPredecessorNode().getAlternativePlans(estimator);
+		final List<? extends PlanNode> subPlans2 = getSecondPredecessorNode().getAlternativePlans(estimator);
+
+		List<DagConnection> broadcastConnections = getBroadcastConnections();
+		if (broadcastConnections != null && broadcastConnections.size() > 0) {
+			throw new CompilerException("Found BroadcastVariables on a Union operation");
+		}
+		
+		final ArrayList<PlanNode> outputPlans = new ArrayList<PlanNode>();
+
+		final List<Set<? extends NamedChannel>> broadcastPlanChannels = Collections.emptyList();
+
+		final BinaryUnionOpDescriptor operator = new BinaryUnionOpDescriptor();
+		final RequestedLocalProperties noLocalProps = new RequestedLocalProperties();
+
+		final ExecutionMode input1Mode = this.input1.getDataExchangeMode();
+		final ExecutionMode input2Mode = this.input2.getDataExchangeMode();
+
+		final int dop = getParallelism();
+		final int inDop1 = getFirstPredecessorNode().getParallelism();
+		final int inDop2 = getSecondPredecessorNode().getParallelism();
+
+		final boolean dopChange1 = dop != inDop1;
+		final boolean dopChange2 = dop != inDop2;
+
+		final boolean input1breakPipeline = this.input1.isBreakingPipeline();
+		final boolean input2breakPipeline = this.input2.isBreakingPipeline();
+
+		// enumerate all pairwise combination of the children's plans together with
+		// all possible operator strategy combination
+		
+		// create all candidates
+		for (PlanNode child1 : subPlans1) {
+			for (PlanNode child2 : subPlans2) {
+				
+				// check that the children go together. that is the case if they build upon the same
+				// candidate at the joined branch plan. 
+				if (!areBranchCompatible(child1, child2)) {
+					continue;
+				}
+				
+				for (RequestedGlobalProperties igps: this.channelProps) {
+					// create a candidate channel for the first input. mark it cached, if the connection says so
+					Channel c1 = new Channel(child1, this.input1.getMaterializationMode());
+					if (this.input1.getShipStrategy() == null) {
+						// free to choose the ship strategy
+						igps.parameterizeChannel(c1, dopChange1, input1Mode, input1breakPipeline);
+						
+						// if the DOP changed, make sure that we cancel out properties, unless the
+						// ship strategy preserves/establishes them even under changing DOPs
+						if (dopChange1 && !c1.getShipStrategy().isNetworkStrategy()) {
+							c1.getGlobalProperties().reset();
+						}
+					}
+					else {
+						// ship strategy fixed by compiler hint
+						ShipStrategyType shipStrategy = this.input1.getShipStrategy();
+						DataExchangeMode exMode = DataExchangeMode.select(input1Mode, shipStrategy, input1breakPipeline);
+						if (this.keys1 != null) {
+							c1.setShipStrategy(this.input1.getShipStrategy(), this.keys1.toFieldList(), exMode);
+						} else {
+							c1.setShipStrategy(this.input1.getShipStrategy(), exMode);
+						}
+						
+						if (dopChange1) {
+							c1.adjustGlobalPropertiesForFullParallelismChange();
+						}
+					}
+					
+					// create a candidate channel for the first input. mark it cached, if the connection says so
+					Channel c2 = new Channel(child2, this.input2.getMaterializationMode());
+					if (this.input2.getShipStrategy() == null) {
+						// free to choose the ship strategy
+						igps.parameterizeChannel(c2, dopChange2, input2Mode, input2breakPipeline);
+						
+						// if the DOP changed, make sure that we cancel out properties, unless the
+						// ship strategy preserves/establishes them even under changing DOPs
+						if (dopChange2 && !c2.getShipStrategy().isNetworkStrategy()) {
+							c2.getGlobalProperties().reset();
+						}
+					} else {
+						// ship strategy fixed by compiler hint
+						ShipStrategyType shipStrategy = this.input2.getShipStrategy();
+						DataExchangeMode exMode = DataExchangeMode.select(input2Mode, shipStrategy, input2breakPipeline);
+						if (this.keys2 != null) {
+							c2.setShipStrategy(this.input2.getShipStrategy(), this.keys2.toFieldList(), exMode);
+						} else {
+							c2.setShipStrategy(this.input2.getShipStrategy(), exMode);
+						}
+						
+						if (dopChange2) {
+							c2.adjustGlobalPropertiesForFullParallelismChange();
+						}
+					}
+					
+					// get the global properties and clear unique fields (not preserved anyways during the union)
+					GlobalProperties p1 = c1.getGlobalProperties();
+					GlobalProperties p2 = c2.getGlobalProperties();
+					p1.clearUniqueFieldCombinations();
+					p2.clearUniqueFieldCombinations();
+					
+					// adjust the partitioning, if they exist but are not equal. this may happen when both channels have a
+					// partitioning that fulfills the requirements, but both are incompatible. For example may a property requirement
+					// be ANY_PARTITIONING on fields (0) and one channel is range partitioned on that field, the other is hash
+					// partitioned on that field. 
+					if (!igps.isTrivial() && !(p1.equals(p2))) {
+						if (c1.getShipStrategy() == ShipStrategyType.FORWARD && c2.getShipStrategy() != ShipStrategyType.FORWARD) {
+							// adjust c2 to c1
+							c2 = c2.clone();
+							p1.parameterizeChannel(c2, dopChange2, input2Mode, input2breakPipeline);
+						}
+						else if (c2.getShipStrategy() == ShipStrategyType.FORWARD && c1.getShipStrategy() != ShipStrategyType.FORWARD) {
+							// adjust c1 to c2
+							c1 = c1.clone();
+							p2.parameterizeChannel(c1,dopChange1, input1Mode, input1breakPipeline);
+						}
+						else if (c1.getShipStrategy() == ShipStrategyType.FORWARD && c2.getShipStrategy() == ShipStrategyType.FORWARD) {
+							boolean adjustC1 = c1.getEstimatedOutputSize() <= 0 || c2.getEstimatedOutputSize() <= 0 ||
+									c1.getEstimatedOutputSize() <= c2.getEstimatedOutputSize();
+							if (adjustC1) {
+								c2 = c2.clone();
+								p1.parameterizeChannel(c2, dopChange2, input2Mode, input2breakPipeline);
+							} else {
+								c1 = c1.clone();
+								p2.parameterizeChannel(c1, dopChange1, input1Mode, input1breakPipeline);
+							}
+						} else {
+							// this should never happen, as it implies both realize a different strategy, which is
+							// excluded by the check that the required strategies must match
+							throw new CompilerException("Bug in Plan Enumeration for Union Node.");
+						}
+					}
+
+
+					instantiate(operator, c1, c2, broadcastPlanChannels, outputPlans, estimator, igps, igps, noLocalProps, noLocalProps);
+				}
+			}
+		}
+
+		// cost and prune the plans
+		for (PlanNode node : outputPlans) {
+			estimator.costOperator(node);
+		}
+		prunePlanAlternatives(outputPlans);
+		outputPlans.trimToSize();
+
+		this.cachedPlans = outputPlans;
+		return outputPlans;
+	}
+	
+	@Override
+	protected void readStubAnnotations() {}
+
+	@Override
+	public SemanticProperties getSemanticProperties() {
+		return new UnionSemanticProperties();
+	}
+	
+	@Override
+	public void computeOutputEstimates(DataStatistics statistics) {
+		OptimizerNode in1 = getFirstPredecessorNode();
+		OptimizerNode in2 = getSecondPredecessorNode();
+		
+		this.estimatedNumRecords = in1.estimatedNumRecords > 0 && in2.estimatedNumRecords > 0 ?
+				in1.estimatedNumRecords + in2.estimatedNumRecords : -1;
+		this.estimatedOutputSize = in1.estimatedOutputSize > 0 && in2.estimatedOutputSize > 0 ?
+			in1.estimatedOutputSize + in2.estimatedOutputSize : -1;
+	}
+
+	public static class UnionSemanticProperties implements SemanticProperties {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public FieldSet getForwardingTargetFields(int input, int sourceField) {
+			if (input != 0 && input != 1) {
+				throw new IndexOutOfBoundsException("Invalid input index for binary union node.");
+			}
+
+			return new FieldSet(sourceField);
+		}
+
+		@Override
+		public int getForwardingSourceField(int input, int targetField) {
+			if (input != 0 && input != 1) {
+				throw new IndexOutOfBoundsException();
+			}
+
+			return targetField;
+		}
+
+		@Override
+		public FieldSet getReadFields(int input) {
+			if (input != 0 && input != 1) {
+				throw new IndexOutOfBoundsException();
+			}
+
+			return FieldSet.EMPTY_SET;
+		}
+
+	}
+}


Mime
View raw message