flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [39/53] [abbrv] flink git commit: [optimizer] Rename optimizer project to "flink-optimizer" (previously flink-compiler)
Date Fri, 20 Mar 2015 10:07:18 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/costs/Costs.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/costs/Costs.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/costs/Costs.java
deleted file mode 100644
index 7c854bf..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/costs/Costs.java
+++ /dev/null
@@ -1,492 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.costs;
-
-/**
- * Simple class to represent the costs of an operation. The costs are currently tracking, network, I/O and CPU costs.
- * 
- * Costs are composed of two parts of cost contributors:
- * <ol>
- *   <li>Quantifiable costs. Those costs are used when estimates are available and track a quantifiable
- *       measure, such as the number of bytes for network or I/O</li>
- *   <li>Heuristic costs. Those costs are used when no estimates are available. They can be used to track that
- *       an operator used a special operation which is heuristically considered more expensive than another
- *       operation.</li>
- * </ol>
- * <p>
- * The quantifiable costs may frequently be unknown, which is represented by a {@code -1} as a value for the unknown
- * components of the cost. In that case, all operations' costs are unknown and hence it is not decidable which
- * operation to favor during pruning. In that case, the heuristic costs should contain a value to make sure that
- * operators with different strategies are comparable, even in the absence of estimates. The heuristic
- * costs are hence the system's mechanism of realizing pruning heuristics that favor some operations over others.
- */
-public class Costs implements Comparable<Costs>, Cloneable {
-
-	public static final double UNKNOWN = -1;
-	
-	private double networkCost;				// network cost, in transferred bytes
-
-	private double diskCost;		// bytes to be written and read, in bytes
-	
-	private double cpuCost;					// CPU costs
-	
-	private double heuristicNetworkCost;
-	
-	private double heuristicDiskCost;
-	
-	private double heuristicCpuCost;
-	
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Default constructor. Initializes all costs to 0;
-	 */
-	public Costs() {
-	}
-
-	/**
-	 * Creates a new costs object using the given values for the network and storage cost.
-	 * 
-	 * @param networkCost The network cost, in bytes to be transferred.
-	 * @param diskCost The cost for disk, in bytes to be written and read.
-	 */
-	public Costs(double networkCost, double diskCost) {
-		setNetworkCost(networkCost);
-		setDiskCost(diskCost);
-	}
-	
-	/**
-	 * Creates a new costs object using the given values for the network and storage cost.
-	 * 
-	 * @param networkCost The network cost, in bytes to be transferred.
-	 * @param diskCost The cost for disk, in bytes to be written and read.
-	 * @param cpuCost The cost for CPU operations.
-	 */
-	public Costs(double networkCost, double diskCost, double cpuCost) {
-		setNetworkCost(networkCost);
-		setDiskCost(diskCost);
-		setCpuCost(cpuCost);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Gets the network cost.
-	 * 
-	 * @return The network cost, in bytes to be transferred.
-	 */
-	public double getNetworkCost() {
-		return networkCost;
-	}
-
-	/**
-	 * Sets the network cost for this Costs object.
-	 * 
-	 * @param bytes
-	 *        The network cost to set, in bytes to be transferred.
-	 */
-	public void setNetworkCost(double bytes) {
-		if (bytes == UNKNOWN || bytes >= 0) {
-			this.networkCost = bytes;
-		} else {
-			throw new IllegalArgumentException();
-		}
-	}
-	
-	/**
-	 * Adds the costs for network to the current network costs
-	 * for this Costs object.
-	 * 
-	 * @param bytes The network cost to add, in bytes to be transferred.
-	 */
-	public void addNetworkCost(double bytes) {
-		this.networkCost = (this.networkCost < 0 || bytes < 0) ? UNKNOWN : this.networkCost + bytes;
-	}
-
-	/**
-	 * Gets the costs for disk.
-	 * 
-	 * @return The disk cost, in bytes to be written and read.
-	 */
-	public double getDiskCost() {
-		return diskCost;
-	}
-
-	/**
-	 * Sets the costs for disk for this Costs object.
-	 * 
-	 * @param bytes The disk cost to set, in bytes to be written and read.
-	 */
-	public void setDiskCost(double bytes) {
-		if (bytes == UNKNOWN || bytes >= 0) {
-			this.diskCost = bytes;
-		} else {
-			throw new IllegalArgumentException();
-		}
-	}
-	
-	/**
-	 * Adds the costs for disk to the current disk costs
-	 * for this Costs object.
-	 * 
-	 * @param bytes The disk cost to add, in bytes to be written and read.
-	 */
-	public void addDiskCost(double bytes) {
-		this.diskCost = 
-			(this.diskCost < 0 || bytes < 0) ? UNKNOWN : this.diskCost + bytes;
-	}
-	
-	/**
-	 * Gets the cost for the CPU.
-	 * 
-	 * @return The CPU Cost.
-	 */
-	public double getCpuCost() {
-		return this.cpuCost;
-	}
-
-	/**
-	 * Sets the cost for the CPU.
-	 * 
-	 * @param cost The CPU Cost.
-	 */
-	public void setCpuCost(double cost) {
-		if (cost == UNKNOWN || cost >= 0) {
-			this.cpuCost = cost;
-		} else {
-			throw new IllegalArgumentException();
-		}
-	}
-	
-	/**
-	 * Adds the given CPU cost to the current CPU cost for this Costs object.
-	 * 
-	 * @param cost The CPU cost to add.
-	 */
-	public void addCpuCost(double cost) {
-		this.cpuCost = 
-			(this.cpuCost < 0 || cost < 0) ? UNKNOWN : this.cpuCost + cost;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Gets the heuristic network cost.
-	 * 
-	 * @return The heuristic network cost, in bytes to be transferred.
-	 */
-	public double getHeuristicNetworkCost() {
-		return this.heuristicNetworkCost;
-	}
-
-	/**
-	 * Sets the heuristic network cost for this Costs object.
-	 * 
-	 * @param cost The heuristic network cost to set, in bytes to be transferred.
-	 */
-	public void setHeuristicNetworkCost(double cost) {
-		if (cost <= 0) {
-			throw new IllegalArgumentException("Heuristic costs must be positive.");
-		}
-		this.heuristicNetworkCost = cost;
-	}
-	
-	/**
-	 * Adds the heuristic costs for network to the current heuristic network costs
-	 * for this Costs object.
-	 * 
-	 * @param cost The heuristic network cost to add.
-	 */
-	public void addHeuristicNetworkCost(double cost) {
-		if (cost <= 0) {
-			throw new IllegalArgumentException("Heuristic costs must be positive.");
-		}
-		this.heuristicNetworkCost += cost;
-		// check for overflow
-		if (this.heuristicNetworkCost < 0) {
-			this.heuristicNetworkCost = Double.MAX_VALUE;
-		}
-	}
-
-	/**
-	 * Gets the heuristic costs for disk.
-	 * 
-	 * @return The heuristic disk cost.
-	 */
-	public double getHeuristicDiskCost() {
-		return this.heuristicDiskCost;
-	}
-
-	/**
-	 * Sets the heuristic costs for disk for this Costs object.
-	 * 
-	 * @param cost The heuristic disk cost to set.
-	 */
-	public void setHeuristicDiskCost(double cost) {
-		if (cost <= 0) {
-			throw new IllegalArgumentException("Heuristic costs must be positive.");
-		}
-		this.heuristicDiskCost = cost;
-	}
-	
-	/**
-	 * Adds the heuristic costs for disk to the current heuristic disk costs
-	 * for this Costs object.
-	 * 
-	 * @param cost The heuristic disk cost to add.
-	 */
-	public void addHeuristicDiskCost(double cost) {
-		if (cost <= 0) {
-			throw new IllegalArgumentException("Heuristic costs must be positive.");
-		}
-		this.heuristicDiskCost += cost;
-		// check for overflow
-		if (this.heuristicDiskCost < 0) {
-			this.heuristicDiskCost = Double.MAX_VALUE;
-		}
-	}
-	
-	/**
-	 * Gets the heuristic cost for the CPU.
-	 * 
-	 * @return The heuristic CPU Cost.
-	 */
-	public double getHeuristicCpuCost() {
-		return this.heuristicCpuCost;
-	}
-
-	/**
-	 * Sets the heuristic cost for the CPU.
-	 * 
-	 * @param cost The heuristic CPU Cost.
-	 */
-	public void setHeuristicCpuCost(double cost) {
-		if (cost <= 0) {
-			throw new IllegalArgumentException("Heuristic costs must be positive.");
-		}
-		this.heuristicCpuCost = cost;
-	}
-	
-	/**
-	 * Adds the given heuristic CPU cost to the current heuristic CPU cost for this Costs object.
-	 * 
-	 * @param cost The heuristic CPU cost to add.
-	 */
-	public void addHeuristicCpuCost(double cost) {
-		if (cost <= 0) {
-			throw new IllegalArgumentException("Heuristic costs must be positive.");
-		}
-		this.heuristicCpuCost += cost;
-		// check for overflow
-		if (this.heuristicCpuCost < 0) {
-			this.heuristicCpuCost = Double.MAX_VALUE;
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Adds the given costs to these costs. If for one of the different cost components (network, disk),
-	 * the costs are unknown, the resulting costs will be unknown.
-	 * 
-	 * @param other The costs to add.
-	 */
-	public void addCosts(Costs other) {
-		// ---------- quantifiable costs ----------
-		if (this.networkCost == UNKNOWN || other.networkCost == UNKNOWN) {
-			this.networkCost = UNKNOWN;
-		} else {
-			this.networkCost += other.networkCost;
-		}
-		
-		if (this.diskCost == UNKNOWN || other.diskCost == UNKNOWN) {
-			this.diskCost = UNKNOWN;
-		} else {
-			this.diskCost += other.diskCost;
-		}
-		
-		if (this.cpuCost == UNKNOWN || other.cpuCost == UNKNOWN) {
-			this.cpuCost = UNKNOWN;
-		} else {
-			this.cpuCost += other.cpuCost;
-		}
-		
-		// ---------- heuristic costs ----------
-		
-		this.heuristicNetworkCost += other.heuristicNetworkCost;
-		this.heuristicDiskCost += other.heuristicDiskCost;
-		this.heuristicCpuCost += other.heuristicCpuCost;
-	}
-	
-	/**
-	 * Subtracts the given costs from these costs. If the given costs are unknown, then these costs are remain unchanged.
-	 *  
-	 * @param other The costs to subtract.
-	 */
-	public void subtractCosts(Costs other) {
-		if (this.networkCost != UNKNOWN && other.networkCost != UNKNOWN) {
-			this.networkCost -= other.networkCost;
-			if (this.networkCost < 0) {
-				throw new IllegalArgumentException("Cannot subtract more cost then there is.");
-			}
-		}
-		if (this.diskCost != UNKNOWN && other.diskCost != UNKNOWN) {
-			this.diskCost -= other.diskCost;
-			if (this.diskCost < 0) {
-				throw new IllegalArgumentException("Cannot subtract more cost then there is.");
-			}
-		}
-		if (this.cpuCost != UNKNOWN && other.cpuCost != UNKNOWN) {
-			this.cpuCost -= other.cpuCost;
-			if (this.cpuCost < 0) {
-				throw new IllegalArgumentException("Cannot subtract more cost then there is.");
-			}
-		}
-		
-		// ---------- relative costs ----------
-		
-		this.heuristicNetworkCost -= other.heuristicNetworkCost;
-		if (this.heuristicNetworkCost < 0) {
-			throw new IllegalArgumentException("Cannot subtract more cost then there is.");
-		}
-		this.heuristicDiskCost -= other.heuristicDiskCost;
-		if (this.heuristicDiskCost < 0) {
-			throw new IllegalArgumentException("Cannot subtract more cost then there is.");
-		}
-		this.heuristicCpuCost -= other.heuristicCpuCost;
-		if (this.heuristicCpuCost < 0) {
-			throw new IllegalArgumentException("Cannot subtract more cost then there is.");
-		}
-	}
-	
-	public void multiplyWith(int factor) {
-		this.networkCost = this.networkCost < 0 ? -1 : this.networkCost * factor;
-		this.diskCost = this.diskCost < 0 ? -1 : this.diskCost * factor;
-		this.cpuCost = this.cpuCost < 0 ? -1 : this.cpuCost * factor;
-		this.heuristicNetworkCost = this.heuristicNetworkCost < 0 ? -1 : this.heuristicNetworkCost * factor;
-		this.heuristicDiskCost = this.heuristicDiskCost < 0 ? -1 : this.heuristicDiskCost * factor;
-		this.heuristicCpuCost = this.heuristicCpuCost < 0 ? -1 : this.heuristicCpuCost * factor;
-	}
-
-	public void divideBy(int factor) {
-		this.networkCost = this.networkCost < 0 ? -1 : this.networkCost / factor;
-		this.diskCost = this.diskCost < 0 ? -1 : this.diskCost / factor;
-		this.cpuCost = this.cpuCost < 0 ? -1 : this.cpuCost / factor;
-		this.heuristicNetworkCost = this.heuristicNetworkCost < 0 ? -1 : this.heuristicNetworkCost / factor;
-		this.heuristicDiskCost = this.heuristicDiskCost < 0 ? -1 : this.heuristicDiskCost / factor;
-		this.heuristicCpuCost = this.heuristicCpuCost < 0 ? -1 : this.heuristicCpuCost / factor;
-	}
-
-
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * The order of comparison is: network first, then disk, then CPU. The comparison here happens each time
-	 * primarily after the heuristic costs, then after the quantifiable costs.
-	 * 
-	 * @see java.lang.Comparable#compareTo(java.lang.Object)
-	 */
-	@Override
-	public int compareTo(Costs o) {
-		// check the network cost. if we have actual costs on both, use them, otherwise use the heuristic costs.
-		if (this.networkCost != UNKNOWN && o.networkCost != UNKNOWN) {
-			if (this.networkCost != o.networkCost) {
-				return this.networkCost < o.networkCost ? -1 : 1;
-			}
-		} else if (this.heuristicNetworkCost < o.heuristicNetworkCost) {
-			return -1;
-		} else if (this.heuristicNetworkCost > o.heuristicNetworkCost) {
-			return 1;
-		}
-		
-		// next, check the disk cost. again, if we have actual costs on both, use them, otherwise use the heuristic costs.
-		if (this.diskCost != UNKNOWN && o.diskCost != UNKNOWN) {
-			if (this.diskCost != o.diskCost) {
-				return this.diskCost < o.diskCost ? -1 : 1;
-			}
-		} else if (this.heuristicDiskCost < o.heuristicDiskCost) {
-			return -1;
-		} else if (this.heuristicDiskCost > o.heuristicDiskCost) {
-			return 1;
-		}
-		
-		// next, check the disk cost. again, if we have actual costs on both, use them, otherwise use the heuristic costs.
-		if (this.cpuCost != UNKNOWN && o.cpuCost != UNKNOWN) {
-			return this.cpuCost < o.cpuCost ? -1 : this.cpuCost > o.cpuCost ? 1 : 0;
-		} else if (this.heuristicCpuCost < o.heuristicCpuCost) {
-			return -1;
-		} else if (this.heuristicCpuCost > o.heuristicCpuCost) {
-			return 1;
-		} else {
-			return 0;
-		}
-	}
-
-	@Override
-	public int hashCode() {
-		final int prime = 31;
-		int result = 1;
-		long cpuCostBits = Double.doubleToLongBits(cpuCost);
-		long heuristicCpuCostBits = Double.doubleToLongBits(heuristicCpuCost);
-		long heuristicNetworkCostBits = Double.doubleToLongBits(heuristicNetworkCost);
-		long heuristicDiskCostBits = Double.doubleToLongBits(heuristicDiskCost);
-		long networkCostBits = Double.doubleToLongBits(networkCost);
-		long diskCostBits = Double.doubleToLongBits(diskCost);
-
-		result = prime * result + (int) (cpuCostBits ^ (cpuCostBits >>> 32));
-		result = prime * result + (int) (heuristicCpuCostBits ^ (heuristicCpuCostBits >>> 32));
-		result = prime * result + (int) (heuristicNetworkCostBits ^ (heuristicNetworkCostBits >>> 32));
-		result = prime * result + (int) (heuristicDiskCostBits ^ (heuristicDiskCostBits >>> 32));
-		result = prime * result + (int) (networkCostBits ^ (networkCostBits >>> 32));
-		result = prime * result + (int) (diskCostBits ^ (diskCostBits >>> 32));
-		return result;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj.getClass() == getClass()) {
-			final Costs other = (Costs) obj;
-			return this.networkCost == other.networkCost &
-					this.diskCost == other.diskCost &
-					this.cpuCost == other.cpuCost &
-					this.heuristicNetworkCost == other.heuristicNetworkCost &
-					this.heuristicDiskCost == other.heuristicDiskCost &
-					this.heuristicCpuCost == other.heuristicCpuCost;
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public String toString() {
-		return "Costs [networkCost=" + networkCost + ", diskCost=" + diskCost + 
-				", cpuCost=" + cpuCost + ", heuristicNetworkCost=" + heuristicNetworkCost + 
-				", heuristicDiskCost=" + heuristicDiskCost + ", heuristicCpuCost=" + heuristicCpuCost + "]";
-	}
-
-	@Override
-	public Costs clone() {
-		try {
-			return (Costs) super.clone();
-		} catch (CloneNotSupportedException e) {
-			throw new RuntimeException(e);	// should never happen
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/costs/DefaultCostEstimator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/costs/DefaultCostEstimator.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/costs/DefaultCostEstimator.java
deleted file mode 100644
index 543f330..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/costs/DefaultCostEstimator.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.costs;
-
-import org.apache.flink.optimizer.dag.EstimateProvider;
-
-/**
- * A default cost estimator that has access to basic size and cardinality estimates.
- * <p>
- * This estimator works with actual estimates (as far as they are available) and falls back to setting
- * relative costs, if no estimates are available. That way, the estimator makes sure that plans with
- * different strategies are costed differently, also in the absence of estimates. The different relative
- * costs in the absence of estimates represent this estimator's heuristic guidance towards certain strategies.
- * <p>
- * For robustness reasons, we always assume that the whole data is shipped during a repartition step. We deviate from
- * the typical estimate of <code>(n - 1) / n</code> (with <i>n</i> being the number of nodes), because for a parallelism
- * of 1, that would yield a shipping of zero bytes. While this is usually correct, the runtime scheduling may still
- * choose to move tasks to different nodes, so that we do not know that no data is shipped.
- */
-public class DefaultCostEstimator extends CostEstimator {
-	
-	/**
-	 * The case of the estimation for all relative costs. We heuristically pick a very large data volume, which
-	 * will favor strategies that are less expensive on large data volumes. This is robust and 
-	 */
-	private static final long HEURISTIC_COST_BASE = 1000000000L;
-	
-	// The numbers for the CPU effort are rather magic at the moment and should be seen rather ordinal
-	
-	private static final float MATERIALIZATION_CPU_FACTOR = 1;
-	
-	private static final float HASHING_CPU_FACTOR = 4;
-	
-	private static final float SORTING_CPU_FACTOR = 9;
-	
-	
-	// --------------------------------------------------------------------------------------------
-	// Shipping Strategy Cost
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void addRandomPartitioningCost(EstimateProvider estimates, Costs costs) {
-		// conservative estimate: we need ship the whole data over the network to establish the
-		// partitioning. no disk costs.
-		final long estOutShipSize = estimates.getEstimatedOutputSize();
-		if (estOutShipSize <= 0) {
-			costs.setNetworkCost(Costs.UNKNOWN);
-		} else {
-			costs.addNetworkCost(estOutShipSize);
-		}
-		costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE);
-	}
-	
-	@Override
-	public void addHashPartitioningCost(EstimateProvider estimates, Costs costs) {
-		// conservative estimate: we need ship the whole data over the network to establish the
-		// partitioning. no disk costs.
-		final long estOutShipSize = estimates.getEstimatedOutputSize();
-		if (estOutShipSize <= 0) {
-			costs.setNetworkCost(Costs.UNKNOWN);
-		} else {
-			costs.addNetworkCost(estOutShipSize);
-		}
-		costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE);
-	}
-	
-	@Override
-	public void addRangePartitionCost(EstimateProvider estimates, Costs costs) {
-		final long dataSize = estimates.getEstimatedOutputSize();
-		if (dataSize > 0) {
-			// Assume sampling of 10% of the data and spilling it to disk
-			final long sampled = (long) (dataSize * 0.1f);
-			// set shipping costs
-			costs.addNetworkCost(dataSize + sampled);
-		} else {
-			costs.setNetworkCost(Costs.UNKNOWN);
-		}
-		
-		// no costs known. use the same assumption as above on the heuristic costs
-		final long sampled = (long) (HEURISTIC_COST_BASE * 0.1f);
-		costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE + sampled);
-		costs.addHeuristicDiskCost(2 * sampled);
-	}
-
-	@Override
-	public void addBroadcastCost(EstimateProvider estimates, int replicationFactor, Costs costs) {
-		// if our replication factor is negative, we cannot calculate broadcast costs
-		if (replicationFactor <= 0) {
-			throw new IllegalArgumentException("The replication factor of must be larger than zero.");
-		}
-
-		if (replicationFactor > 0) {
-			// assumption: we need ship the whole data over the network to each node.
-			final long estOutShipSize = estimates.getEstimatedOutputSize();
-			if (estOutShipSize <= 0) {
-				costs.setNetworkCost(Costs.UNKNOWN);
-			} else {
-				costs.addNetworkCost(replicationFactor * estOutShipSize);
-			}
-			costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * 10 * replicationFactor);
-		} else {
-			costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * 1000);
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	// Local Strategy Cost
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void addFileInputCost(long fileSizeInBytes, Costs costs) {
-		if (fileSizeInBytes >= 0) {
-			costs.addDiskCost(fileSizeInBytes);
-		} else {
-			costs.setDiskCost(Costs.UNKNOWN);
-		}
-		costs.addHeuristicDiskCost(HEURISTIC_COST_BASE);
-	}
-	
-	@Override
-	public void addLocalSortCost(EstimateProvider estimates, Costs costs) {
-		final long s = estimates.getEstimatedOutputSize();
-		// we assume a two phase merge sort, so all in all 2 I/O operations per block
-		if (s <= 0) {
-			costs.setDiskCost(Costs.UNKNOWN);
-			costs.setCpuCost(Costs.UNKNOWN);
-		} else {
-			costs.addDiskCost(2 * s);
-			costs.addCpuCost((long) (s * SORTING_CPU_FACTOR));
-		}
-		costs.addHeuristicDiskCost(2 * HEURISTIC_COST_BASE);
-		costs.addHeuristicCpuCost((long) (HEURISTIC_COST_BASE * SORTING_CPU_FACTOR));
-	}
-
-	@Override
-	public void addLocalMergeCost(EstimateProvider input1, EstimateProvider input2, Costs costs, int costWeight) {
-		// costs nothing. the very rarely incurred cost for a spilling block nested loops join in the
-		// presence of massively re-occurring duplicate keys is ignored, because cannot be assessed
-	}
-
-	@Override
-	public void addHybridHashCosts(EstimateProvider buildSideInput, EstimateProvider probeSideInput, Costs costs, int costWeight) {
-		long bs = buildSideInput.getEstimatedOutputSize();
-		long ps = probeSideInput.getEstimatedOutputSize();
-		
-		if (bs > 0 && ps > 0) {
-			long overall = 2*bs + ps;
-			costs.addDiskCost(overall);
-			costs.addCpuCost((long) (overall * HASHING_CPU_FACTOR));
-		} else {
-			costs.setDiskCost(Costs.UNKNOWN);
-			costs.setCpuCost(Costs.UNKNOWN);
-		}
-		costs.addHeuristicDiskCost(2 * HEURISTIC_COST_BASE);
-		costs.addHeuristicCpuCost((long) (2 * HEURISTIC_COST_BASE * HASHING_CPU_FACTOR));
-		
-		// cost weight applies to everything
-		costs.multiplyWith(costWeight);
-	}
-	
-	/**
-	 * Calculates the costs for the cached variant of the hybrid hash join.
-	 * We are assuming by default that half of the cached hash table fit into memory.
-	 */
-	@Override
-	public void addCachedHybridHashCosts(EstimateProvider buildSideInput, EstimateProvider probeSideInput, Costs costs, int costWeight) {
-		if (costWeight < 1) {
-			throw new IllegalArgumentException("The cost weight must be at least one.");
-		}
-		
-		long bs = buildSideInput.getEstimatedOutputSize();
-		long ps = probeSideInput.getEstimatedOutputSize();
-		
-		if (bs > 0 && ps > 0) {
-			long overall = 2*bs + costWeight*ps;
-			costs.addDiskCost(overall);
-			costs.addCpuCost((long) (overall * HASHING_CPU_FACTOR));
-		} else {
-			costs.setDiskCost(Costs.UNKNOWN);
-			costs.setCpuCost(Costs.UNKNOWN);
-		}
-		
-		// one time the build side plus cost-weight time the probe side
-		costs.addHeuristicDiskCost((1 + costWeight) * HEURISTIC_COST_BASE);
-		costs.addHeuristicCpuCost((long) ((1 + costWeight) * HEURISTIC_COST_BASE * HASHING_CPU_FACTOR));
-	}
-
-	@Override
-	public void addStreamedNestedLoopsCosts(EstimateProvider outerSide, EstimateProvider innerSide, long bufferSize, Costs costs, int costWeight) {
-		long is = innerSide.getEstimatedOutputSize(); 
-		long oc = outerSide.getEstimatedNumRecords();
-		
-		if (is > 0 && oc >= 0) {
-			// costs, if the inner side cannot be cached
-			if (is > bufferSize) {
-				costs.addDiskCost(oc * is);
-			}
-			costs.addCpuCost((long) (oc * is * MATERIALIZATION_CPU_FACTOR));
-		} else {
-			costs.setDiskCost(Costs.UNKNOWN);
-			costs.setCpuCost(Costs.UNKNOWN);
-		}
-		
-		// hack: assume 100k loops (should be expensive enough)
-		costs.addHeuristicDiskCost(HEURISTIC_COST_BASE * 100000);
-		costs.addHeuristicCpuCost((long) (HEURISTIC_COST_BASE * 100000 * MATERIALIZATION_CPU_FACTOR));
-		costs.multiplyWith(costWeight);
-	}
-
-	@Override
-	public void addBlockNestedLoopsCosts(EstimateProvider outerSide, EstimateProvider innerSide, long blockSize, Costs costs, int costWeight) {
-		long is = innerSide.getEstimatedOutputSize(); 
-		long os = outerSide.getEstimatedOutputSize();
-		
-		if (is > 0 && os > 0) {
-			long loops = Math.max(os / blockSize, 1);
-			costs.addDiskCost(loops * is);
-			costs.addCpuCost((long) (loops * is * MATERIALIZATION_CPU_FACTOR));
-		} else {
-			costs.setDiskCost(Costs.UNKNOWN);
-			costs.setCpuCost(Costs.UNKNOWN);
-		}
-		
-		// hack: assume 1k loops (much cheaper than the streamed variant!)
-		costs.addHeuristicDiskCost(HEURISTIC_COST_BASE * 1000);
-		costs.addHeuristicCpuCost((long) (HEURISTIC_COST_BASE * 1000 * MATERIALIZATION_CPU_FACTOR));
-		costs.multiplyWith(costWeight);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// Damming Cost
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void addArtificialDamCost(EstimateProvider estimates, long bufferSize, Costs costs) {
-		final long s = estimates.getEstimatedOutputSize();
-		// we assume spilling and re-reading
-		if (s <= 0) {
-			costs.setDiskCost(Costs.UNKNOWN);
-			costs.setCpuCost(Costs.UNKNOWN);
-		} else {
-			costs.addDiskCost(2 * s);
-			costs.setCpuCost((long) (s * MATERIALIZATION_CPU_FACTOR));
-		}
-		costs.addHeuristicDiskCost(2 * HEURISTIC_COST_BASE);
-		costs.addHeuristicCpuCost((long) (HEURISTIC_COST_BASE * MATERIALIZATION_CPU_FACTOR));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/AbstractPartialSolutionNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/AbstractPartialSolutionNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/AbstractPartialSolutionNode.java
deleted file mode 100644
index d199ae7..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/AbstractPartialSolutionNode.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.costs.CostEstimator;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.util.Visitor;
-
-/**
- * The optimizer's internal representation of the partial solution that is input to a bulk iteration.
- */
-public abstract class AbstractPartialSolutionNode extends OptimizerNode {
-	
-	protected AbstractPartialSolutionNode(Operator<?> contract) {
-		super(contract);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	protected void copyEstimates(OptimizerNode node) {
-		this.estimatedNumRecords = node.estimatedNumRecords;
-		this.estimatedOutputSize = node.estimatedOutputSize;
-	}
-	
-	public abstract IterationNode getIterationNode();
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public boolean isOnDynamicPath() {
-		return true;
-	}
-	
-	public void identifyDynamicPath(int costWeight) {
-		this.onDynamicPath = true;
-		this.costWeight = costWeight;
-	}
-
-	@Override
-	public List<DagConnection> getIncomingConnections() {
-		return Collections.emptyList();
-	}
-
-	@Override
-	public void setInput(Map<Operator<?>, OptimizerNode> contractToNode, ExecutionMode dataExchangeMode) {}
-
-	@Override
-	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
-		// we do nothing here, because the estimates can only be copied from the iteration input
-	}
-	
-	@Override
-	public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
-		// no children, so nothing to compute
-	}
-
-	@Override
-	public List<PlanNode> getAlternativePlans(CostEstimator estimator) {
-		if (this.cachedPlans != null) {
-			return this.cachedPlans;
-		} else {
-			throw new IllegalStateException();
-		}
-	}
-
-	@Override
-	public SemanticProperties getSemanticProperties() {
-		return new EmptySemanticProperties();
-	}
-	
-	@Override
-	protected void readStubAnnotations() {}
-
-	@Override
-	public void accept(Visitor<OptimizerNode> visitor) {
-		if (visitor.preVisit(this)) {
-			visitor.postVisit(this);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
deleted file mode 100644
index 068799e..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.Union;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.costs.CostEstimator;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.InterestingProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.operators.BinaryUnionOpDescriptor;
-import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.NamedChannel;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.runtime.io.network.DataExchangeMode;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-
-/**
- * The Optimizer representation of a binary <i>Union</i>.
- */
-public class BinaryUnionNode extends TwoInputNode {
-	
-	private Set<RequestedGlobalProperties> channelProps;
-
-	public BinaryUnionNode(Union<?> union){
-		super(union);
-	}
-
-	@Override
-	public String getName() {
-		return "Union";
-	}
-
-	@Override
-	protected List<OperatorDescriptorDual> getPossibleProperties() {
-		return Collections.emptyList();
-	}
-	
-	@Override
-	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
-		long card1 = getFirstPredecessorNode().getEstimatedNumRecords();
-		long card2 = getSecondPredecessorNode().getEstimatedNumRecords();
-		this.estimatedNumRecords = (card1 < 0 || card2 < 0) ? -1 : card1 + card2;
-		
-		long size1 = getFirstPredecessorNode().getEstimatedOutputSize();
-		long size2 = getSecondPredecessorNode().getEstimatedOutputSize();
-		this.estimatedOutputSize = (size1 < 0 || size2 < 0) ? -1 : size1 + size2;
-	}
-	
-	@Override
-	public void computeUnionOfInterestingPropertiesFromSuccessors() {
-		super.computeUnionOfInterestingPropertiesFromSuccessors();
-		// clear all local properties, as they are destroyed anyways
-		getInterestingProperties().getLocalProperties().clear();
-	}
-	
-	@Override
-	public void computeInterestingPropertiesForInputs(CostEstimator estimator) { 
-		final InterestingProperties props = getInterestingProperties();
-		
-		// if no other properties exist, add the pruned trivials back
-		if (props.getGlobalProperties().isEmpty()) {
-			props.addGlobalProperties(new RequestedGlobalProperties());
-		}
-		props.addLocalProperties(new RequestedLocalProperties());
-		this.input1.setInterestingProperties(props.clone());
-		this.input2.setInterestingProperties(props.clone());
-		
-		this.channelProps = props.getGlobalProperties();
-	}
-	
-	@Override
-	public List<PlanNode> getAlternativePlans(CostEstimator estimator) {
-		// check if we have a cached version
-		if (this.cachedPlans != null) {
-			return this.cachedPlans;
-		}
-
-		// step down to all producer nodes and calculate alternative plans
-		final List<? extends PlanNode> subPlans1 = getFirstPredecessorNode().getAlternativePlans(estimator);
-		final List<? extends PlanNode> subPlans2 = getSecondPredecessorNode().getAlternativePlans(estimator);
-
-		List<DagConnection> broadcastConnections = getBroadcastConnections();
-		if (broadcastConnections != null && broadcastConnections.size() > 0) {
-			throw new CompilerException("Found BroadcastVariables on a Union operation");
-		}
-		
-		final ArrayList<PlanNode> outputPlans = new ArrayList<PlanNode>();
-
-		final List<Set<? extends NamedChannel>> broadcastPlanChannels = Collections.emptyList();
-
-		final BinaryUnionOpDescriptor operator = new BinaryUnionOpDescriptor();
-		final RequestedLocalProperties noLocalProps = new RequestedLocalProperties();
-
-		final ExecutionMode input1Mode = this.input1.getDataExchangeMode();
-		final ExecutionMode input2Mode = this.input2.getDataExchangeMode();
-
-		final int dop = getParallelism();
-		final int inDop1 = getFirstPredecessorNode().getParallelism();
-		final int inDop2 = getSecondPredecessorNode().getParallelism();
-
-		final boolean dopChange1 = dop != inDop1;
-		final boolean dopChange2 = dop != inDop2;
-
-		final boolean input1breakPipeline = this.input1.isBreakingPipeline();
-		final boolean input2breakPipeline = this.input2.isBreakingPipeline();
-
-		// enumerate all pairwise combination of the children's plans together with
-		// all possible operator strategy combination
-		
-		// create all candidates
-		for (PlanNode child1 : subPlans1) {
-			for (PlanNode child2 : subPlans2) {
-				
-				// check that the children go together. that is the case if they build upon the same
-				// candidate at the joined branch plan. 
-				if (!areBranchCompatible(child1, child2)) {
-					continue;
-				}
-				
-				for (RequestedGlobalProperties igps: this.channelProps) {
-					// create a candidate channel for the first input. mark it cached, if the connection says so
-					Channel c1 = new Channel(child1, this.input1.getMaterializationMode());
-					if (this.input1.getShipStrategy() == null) {
-						// free to choose the ship strategy
-						igps.parameterizeChannel(c1, dopChange1, input1Mode, input1breakPipeline);
-						
-						// if the DOP changed, make sure that we cancel out properties, unless the
-						// ship strategy preserves/establishes them even under changing DOPs
-						if (dopChange1 && !c1.getShipStrategy().isNetworkStrategy()) {
-							c1.getGlobalProperties().reset();
-						}
-					}
-					else {
-						// ship strategy fixed by compiler hint
-						ShipStrategyType shipStrategy = this.input1.getShipStrategy();
-						DataExchangeMode exMode = DataExchangeMode.select(input1Mode, shipStrategy, input1breakPipeline);
-						if (this.keys1 != null) {
-							c1.setShipStrategy(this.input1.getShipStrategy(), this.keys1.toFieldList(), exMode);
-						} else {
-							c1.setShipStrategy(this.input1.getShipStrategy(), exMode);
-						}
-						
-						if (dopChange1) {
-							c1.adjustGlobalPropertiesForFullParallelismChange();
-						}
-					}
-					
-					// create a candidate channel for the first input. mark it cached, if the connection says so
-					Channel c2 = new Channel(child2, this.input2.getMaterializationMode());
-					if (this.input2.getShipStrategy() == null) {
-						// free to choose the ship strategy
-						igps.parameterizeChannel(c2, dopChange2, input2Mode, input2breakPipeline);
-						
-						// if the DOP changed, make sure that we cancel out properties, unless the
-						// ship strategy preserves/establishes them even under changing DOPs
-						if (dopChange2 && !c2.getShipStrategy().isNetworkStrategy()) {
-							c2.getGlobalProperties().reset();
-						}
-					} else {
-						// ship strategy fixed by compiler hint
-						ShipStrategyType shipStrategy = this.input2.getShipStrategy();
-						DataExchangeMode exMode = DataExchangeMode.select(input2Mode, shipStrategy, input2breakPipeline);
-						if (this.keys2 != null) {
-							c2.setShipStrategy(this.input2.getShipStrategy(), this.keys2.toFieldList(), exMode);
-						} else {
-							c2.setShipStrategy(this.input2.getShipStrategy(), exMode);
-						}
-						
-						if (dopChange2) {
-							c2.adjustGlobalPropertiesForFullParallelismChange();
-						}
-					}
-					
-					// get the global properties and clear unique fields (not preserved anyways during the union)
-					GlobalProperties p1 = c1.getGlobalProperties();
-					GlobalProperties p2 = c2.getGlobalProperties();
-					p1.clearUniqueFieldCombinations();
-					p2.clearUniqueFieldCombinations();
-					
-					// adjust the partitioning, if they exist but are not equal. this may happen when both channels have a
-					// partitioning that fulfills the requirements, but both are incompatible. For example may a property requirement
-					// be ANY_PARTITIONING on fields (0) and one channel is range partitioned on that field, the other is hash
-					// partitioned on that field. 
-					if (!igps.isTrivial() && !(p1.equals(p2))) {
-						if (c1.getShipStrategy() == ShipStrategyType.FORWARD && c2.getShipStrategy() != ShipStrategyType.FORWARD) {
-							// adjust c2 to c1
-							c2 = c2.clone();
-							p1.parameterizeChannel(c2, dopChange2, input2Mode, input2breakPipeline);
-						}
-						else if (c2.getShipStrategy() == ShipStrategyType.FORWARD && c1.getShipStrategy() != ShipStrategyType.FORWARD) {
-							// adjust c1 to c2
-							c1 = c1.clone();
-							p2.parameterizeChannel(c1,dopChange1, input1Mode, input1breakPipeline);
-						}
-						else if (c1.getShipStrategy() == ShipStrategyType.FORWARD && c2.getShipStrategy() == ShipStrategyType.FORWARD) {
-							boolean adjustC1 = c1.getEstimatedOutputSize() <= 0 || c2.getEstimatedOutputSize() <= 0 ||
-									c1.getEstimatedOutputSize() <= c2.getEstimatedOutputSize();
-							if (adjustC1) {
-								c2 = c2.clone();
-								p1.parameterizeChannel(c2, dopChange2, input2Mode, input2breakPipeline);
-							} else {
-								c1 = c1.clone();
-								p2.parameterizeChannel(c1, dopChange1, input1Mode, input1breakPipeline);
-							}
-						} else {
-							// this should never happen, as it implies both realize a different strategy, which is
-							// excluded by the check that the required strategies must match
-							throw new CompilerException("Bug in Plan Enumeration for Union Node.");
-						}
-					}
-
-
-					instantiate(operator, c1, c2, broadcastPlanChannels, outputPlans, estimator, igps, igps, noLocalProps, noLocalProps);
-				}
-			}
-		}
-
-		// cost and prune the plans
-		for (PlanNode node : outputPlans) {
-			estimator.costOperator(node);
-		}
-		prunePlanAlternatives(outputPlans);
-		outputPlans.trimToSize();
-
-		this.cachedPlans = outputPlans;
-		return outputPlans;
-	}
-	
-	@Override
-	protected void readStubAnnotations() {}
-
-	@Override
-	public SemanticProperties getSemanticProperties() {
-		return new UnionSemanticProperties();
-	}
-	
-	@Override
-	public void computeOutputEstimates(DataStatistics statistics) {
-		OptimizerNode in1 = getFirstPredecessorNode();
-		OptimizerNode in2 = getSecondPredecessorNode();
-		
-		this.estimatedNumRecords = in1.estimatedNumRecords > 0 && in2.estimatedNumRecords > 0 ?
-				in1.estimatedNumRecords + in2.estimatedNumRecords : -1;
-		this.estimatedOutputSize = in1.estimatedOutputSize > 0 && in2.estimatedOutputSize > 0 ?
-			in1.estimatedOutputSize + in2.estimatedOutputSize : -1;
-	}
-
-	public static class UnionSemanticProperties implements SemanticProperties {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public FieldSet getForwardingTargetFields(int input, int sourceField) {
-			if (input != 0 && input != 1) {
-				throw new IndexOutOfBoundsException("Invalid input index for binary union node.");
-			}
-
-			return new FieldSet(sourceField);
-		}
-
-		@Override
-		public int getForwardingSourceField(int input, int targetField) {
-			if (input != 0 && input != 1) {
-				throw new IndexOutOfBoundsException();
-			}
-
-			return targetField;
-		}
-
-		@Override
-		public FieldSet getReadFields(int input) {
-			if (input != 0 && input != 1) {
-				throw new IndexOutOfBoundsException();
-			}
-
-			return FieldSet.EMPTY_SET;
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
deleted file mode 100644
index 55b8785..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
+++ /dev/null
@@ -1,390 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
-import org.apache.flink.api.common.operators.base.BulkIterationBase;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.traversals.InterestingPropertyVisitor;
-import org.apache.flink.optimizer.costs.CostEstimator;
-import org.apache.flink.optimizer.dag.WorksetIterationNode.SingleRootJoiner;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.InterestingProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.operators.NoOpDescriptor;
-import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
-import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
-import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.NamedChannel;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.util.Visitor;
-
-/**
- * A node in the optimizer's program representation for a bulk iteration.
- */
-public class BulkIterationNode extends SingleInputNode implements IterationNode {
-	
-	private BulkPartialSolutionNode partialSolution;
-	
-	private OptimizerNode terminationCriterion;
-	
-	private OptimizerNode nextPartialSolution;
-	
-	private DagConnection rootConnection;		// connection out of the next partial solution
-	
-	private DagConnection terminationCriterionRootConnection;	// connection out of the term. criterion
-	
-	private OptimizerNode singleRoot;
-	
-	private final int costWeight;
-
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Creates a new node for the bulk iteration.
-	 * 
-	 * @param iteration The bulk iteration the node represents.
-	 */
-	public BulkIterationNode(BulkIterationBase<?> iteration) {
-		super(iteration);
-		
-		if (iteration.getMaximumNumberOfIterations() <= 0) {
-			throw new CompilerException("BulkIteration must have a maximum number of iterations specified.");
-		}
-		
-		int numIters = iteration.getMaximumNumberOfIterations();
-		
-		this.costWeight = (numIters > 0 && numIters < OptimizerNode.MAX_DYNAMIC_PATH_COST_WEIGHT) ?
-			numIters : OptimizerNode.MAX_DYNAMIC_PATH_COST_WEIGHT; 
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	public BulkIterationBase<?> getIterationContract() {
-		return (BulkIterationBase<?>) getOperator();
-	}
-	
-	/**
-	 * Gets the partialSolution from this BulkIterationNode.
-	 *
-	 * @return The partialSolution.
-	 */
-	public BulkPartialSolutionNode getPartialSolution() {
-		return partialSolution;
-	}
-	
-	/**
-	 * Sets the partialSolution for this BulkIterationNode.
-	 *
-	 * @param partialSolution The partialSolution to set.
-	 */
-	public void setPartialSolution(BulkPartialSolutionNode partialSolution) {
-		this.partialSolution = partialSolution;
-	}
-
-	
-	/**
-	 * Gets the nextPartialSolution from this BulkIterationNode.
-	 *
-	 * @return The nextPartialSolution.
-	 */
-	public OptimizerNode getNextPartialSolution() {
-		return nextPartialSolution;
-	}
-	
-	/**
-	 * Sets the nextPartialSolution for this BulkIterationNode.
-	 *
-	 * @param nextPartialSolution The nextPartialSolution to set.
-	 */
-	public void setNextPartialSolution(OptimizerNode nextPartialSolution, OptimizerNode terminationCriterion) {
-		
-		// check if the root of the step function has the same DOP as the iteration
-		// or if the step function has any operator at all
-		if (nextPartialSolution.getParallelism() != getParallelism() ||
-			nextPartialSolution == partialSolution || nextPartialSolution instanceof BinaryUnionNode)
-		{
-			// add a no-op to the root to express the re-partitioning
-			NoOpNode noop = new NoOpNode();
-			noop.setDegreeOfParallelism(getParallelism());
-
-			DagConnection noOpConn = new DagConnection(nextPartialSolution, noop, ExecutionMode.PIPELINED);
-			noop.setIncomingConnection(noOpConn);
-			nextPartialSolution.addOutgoingConnection(noOpConn);
-			
-			nextPartialSolution = noop;
-		}
-		
-		this.nextPartialSolution = nextPartialSolution;
-		this.terminationCriterion = terminationCriterion;
-		
-		if (terminationCriterion == null) {
-			this.singleRoot = nextPartialSolution;
-			this.rootConnection = new DagConnection(nextPartialSolution, ExecutionMode.PIPELINED);
-		}
-		else {
-			// we have a termination criterion
-			SingleRootJoiner singleRootJoiner = new SingleRootJoiner();
-			this.rootConnection = new DagConnection(nextPartialSolution, singleRootJoiner, ExecutionMode.PIPELINED);
-			this.terminationCriterionRootConnection = new DagConnection(terminationCriterion, singleRootJoiner,
-																		ExecutionMode.PIPELINED);
-
-			singleRootJoiner.setInputs(this.rootConnection, this.terminationCriterionRootConnection);
-			
-			this.singleRoot = singleRootJoiner;
-			
-			// add connection to terminationCriterion for interesting properties visitor
-			terminationCriterion.addOutgoingConnection(terminationCriterionRootConnection);
-		
-		}
-		
-		nextPartialSolution.addOutgoingConnection(rootConnection);
-	}
-	
-	public int getCostWeight() {
-		return this.costWeight;
-	}
-	
-	public OptimizerNode getSingleRootOfStepFunction() {
-		return this.singleRoot;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public String getName() {
-		return "Bulk Iteration";
-	}
-
-	@Override
-	public SemanticProperties getSemanticProperties() {
-		return new EmptySemanticProperties();
-	}
-	
-	protected void readStubAnnotations() {}
-	
-	@Override
-	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
-		this.estimatedOutputSize = getPredecessorNode().getEstimatedOutputSize();
-		this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//                             Properties and Optimization
-	// --------------------------------------------------------------------------------------------
-	
-	protected List<OperatorDescriptorSingle> getPossibleProperties() {
-		return Collections.<OperatorDescriptorSingle>singletonList(new NoOpDescriptor());
-	}
-
-	@Override
-	public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
-		final InterestingProperties intProps = getInterestingProperties().clone();
-		
-		if (this.terminationCriterion != null) {
-			// first propagate through termination Criterion. since it has no successors, it has no
-			// interesting properties
-			this.terminationCriterionRootConnection.setInterestingProperties(new InterestingProperties());
-			this.terminationCriterion.accept(new InterestingPropertyVisitor(estimator));
-		}
-		
-		// we need to make 2 interesting property passes, because the root of the step function needs also
-		// the interesting properties as generated by the partial solution
-		
-		// give our own interesting properties (as generated by the iterations successors) to the step function and
-		// make the first pass
-		this.rootConnection.setInterestingProperties(intProps);
-		this.nextPartialSolution.accept(new InterestingPropertyVisitor(estimator));
-		
-		// take the interesting properties of the partial solution and add them to the root interesting properties
-		InterestingProperties partialSolutionIntProps = this.partialSolution.getInterestingProperties();
-		intProps.getGlobalProperties().addAll(partialSolutionIntProps.getGlobalProperties());
-		intProps.getLocalProperties().addAll(partialSolutionIntProps.getLocalProperties());
-		
-		// clear all interesting properties to prepare the second traversal
-		// this clears only the path down from the next partial solution. The paths down
-		// from the termination criterion (before they meet the paths down from the next partial solution)
-		// remain unaffected by this step
-		this.rootConnection.clearInterestingProperties();
-		this.nextPartialSolution.accept(InterestingPropertiesClearer.INSTANCE);
-		
-		// 2nd pass
-		this.rootConnection.setInterestingProperties(intProps);
-		this.nextPartialSolution.accept(new InterestingPropertyVisitor(estimator));
-		
-		// now add the interesting properties of the partial solution to the input
-		final InterestingProperties inProps = this.partialSolution.getInterestingProperties().clone();
-		inProps.addGlobalProperties(new RequestedGlobalProperties());
-		inProps.addLocalProperties(new RequestedLocalProperties());
-		this.inConn.setInterestingProperties(inProps);
-	}
-	
-	@Override
-	public void clearInterestingProperties() {
-		super.clearInterestingProperties();
-		
-		this.singleRoot.accept(InterestingPropertiesClearer.INSTANCE);
-		this.rootConnection.clearInterestingProperties();
-	}
-
-	@Override
-	public void computeUnclosedBranchStack() {
-		if (this.openBranches != null) {
-			return;
-		}
-
-		// the resulting branches are those of the step function
-		// because the BulkPartialSolution takes the input's branches
-		addClosedBranches(getSingleRootOfStepFunction().closedBranchingNodes);
-		List<UnclosedBranchDescriptor> result = getSingleRootOfStepFunction().openBranches;
-
-		this.openBranches = (result == null || result.isEmpty()) ? Collections.<UnclosedBranchDescriptor>emptyList() : result;
-	}
-
-
-	@Override
-	protected void instantiateCandidate(OperatorDescriptorSingle dps, Channel in, List<Set<? extends NamedChannel>> broadcastPlanChannels, 
-			List<PlanNode> target, CostEstimator estimator, RequestedGlobalProperties globPropsReq, RequestedLocalProperties locPropsReq)
-	{
-		// NOTES ON THE ENUMERATION OF THE STEP FUNCTION PLANS:
-		// Whenever we instantiate the iteration, we enumerate new candidates for the step function.
-		// That way, we make sure we have an appropriate plan for each candidate for the initial partial solution,
-		// we have a fitting candidate for the step function (often, work is pushed out of the step function).
-		// Among the candidates of the step function, we keep only those that meet the requested properties of the
-		// current candidate initial partial solution. That makes sure these properties exist at the beginning of
-		// the successive iteration.
-		
-		// 1) Because we enumerate multiple times, we may need to clean the cached plans
-		//    before starting another enumeration
-		this.nextPartialSolution.accept(PlanCacheCleaner.INSTANCE);
-		if (this.terminationCriterion != null) {
-			this.terminationCriterion.accept(PlanCacheCleaner.INSTANCE);
-		}
-		
-		// 2) Give the partial solution the properties of the current candidate for the initial partial solution
-		this.partialSolution.setCandidateProperties(in.getGlobalProperties(), in.getLocalProperties(), in);
-		final BulkPartialSolutionPlanNode pspn = this.partialSolution.getCurrentPartialSolutionPlanNode();
-		
-		// 3) Get the alternative plans
-		List<PlanNode> candidates = this.nextPartialSolution.getAlternativePlans(estimator);
-		
-		// 4) Make sure that the beginning of the step function does not assume properties that 
-		//    are not also produced by the end of the step function.
-
-		{
-			List<PlanNode> newCandidates = new ArrayList<PlanNode>();
-			
-			for (Iterator<PlanNode> planDeleter = candidates.iterator(); planDeleter.hasNext(); ) {
-				PlanNode candidate = planDeleter.next();
-				
-				GlobalProperties atEndGlobal = candidate.getGlobalProperties();
-				LocalProperties atEndLocal = candidate.getLocalProperties();
-				
-				FeedbackPropertiesMeetRequirementsReport report = candidate.checkPartialSolutionPropertiesMet(pspn, atEndGlobal, atEndLocal);
-				if (report == FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) {
-					; // depends only through broadcast variable on the partial solution
-				}
-				else if (report == FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
-					// attach a no-op node through which we create the properties of the original input
-					Channel toNoOp = new Channel(candidate);
-					globPropsReq.parameterizeChannel(toNoOp, false, rootConnection.getDataExchangeMode(), false);
-					locPropsReq.parameterizeChannel(toNoOp);
-					
-					UnaryOperatorNode rebuildPropertiesNode = new UnaryOperatorNode("Rebuild Partial Solution Properties", FieldList.EMPTY_LIST);
-					rebuildPropertiesNode.setDegreeOfParallelism(candidate.getParallelism());
-					
-					SingleInputPlanNode rebuildPropertiesPlanNode = new SingleInputPlanNode(rebuildPropertiesNode, "Rebuild Partial Solution Properties", toNoOp, DriverStrategy.UNARY_NO_OP);
-					rebuildPropertiesPlanNode.initProperties(toNoOp.getGlobalProperties(), toNoOp.getLocalProperties());
-					estimator.costOperator(rebuildPropertiesPlanNode);
-						
-					GlobalProperties atEndGlobalModified = rebuildPropertiesPlanNode.getGlobalProperties();
-					LocalProperties atEndLocalModified = rebuildPropertiesPlanNode.getLocalProperties();
-						
-					if (!(atEndGlobalModified.equals(atEndGlobal) && atEndLocalModified.equals(atEndLocal))) {
-						FeedbackPropertiesMeetRequirementsReport report2 = candidate.checkPartialSolutionPropertiesMet(pspn, atEndGlobalModified, atEndLocalModified);
-						
-						if (report2 != FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
-							newCandidates.add(rebuildPropertiesPlanNode);
-						}
-					}
-					
-					planDeleter.remove();
-				}
-			}
-		}
-		
-		if (candidates.isEmpty()) {
-			return;
-		}
-		
-		// 5) Create a candidate for the Iteration Node for every remaining plan of the step function.
-		if (terminationCriterion == null) {
-			for (PlanNode candidate : candidates) {
-				BulkIterationPlanNode node = new BulkIterationPlanNode(this, "BulkIteration ("+this.getOperator().getName()+")", in, pspn, candidate);
-				GlobalProperties gProps = candidate.getGlobalProperties().clone();
-				LocalProperties lProps = candidate.getLocalProperties().clone();
-				node.initProperties(gProps, lProps);
-				target.add(node);
-			}
-		}
-		else if (candidates.size() > 0) {
-			List<PlanNode> terminationCriterionCandidates = this.terminationCriterion.getAlternativePlans(estimator);
-
-			SingleRootJoiner singleRoot = (SingleRootJoiner) this.singleRoot;
-			
-			for (PlanNode candidate : candidates) {
-				for (PlanNode terminationCandidate : terminationCriterionCandidates) {
-					if (singleRoot.areBranchCompatible(candidate, terminationCandidate)) {
-						BulkIterationPlanNode node = new BulkIterationPlanNode(this, "BulkIteration ("+this.getOperator().getName()+")", in, pspn, candidate, terminationCandidate);
-						GlobalProperties gProps = candidate.getGlobalProperties().clone();
-						LocalProperties lProps = candidate.getLocalProperties().clone();
-						node.initProperties(gProps, lProps);
-						target.add(node);
-						
-					}
-				}
-			}
-			
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//                      Iteration Specific Traversals
-	// --------------------------------------------------------------------------------------------
-
-	public void acceptForStepFunction(Visitor<OptimizerNode> visitor) {
-		this.singleRoot.accept(visitor);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java
deleted file mode 100644
index 25a7eef..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/BulkPartialSolutionNode.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.base.BulkIterationBase.PartialSolutionPlaceHolder;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.PlanNode;
-
-/**
- * The optimizer's internal representation of the partial solution that is input to a bulk iteration.
- */
-public class BulkPartialSolutionNode extends AbstractPartialSolutionNode {
-	
-	private final BulkIterationNode iterationNode;
-	
-	
-	public BulkPartialSolutionNode(PartialSolutionPlaceHolder<?> psph, BulkIterationNode iterationNode) {
-		super(psph);
-		this.iterationNode = iterationNode;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	public void setCandidateProperties(GlobalProperties gProps, LocalProperties lProps, Channel initialInput) {
-		if (this.cachedPlans != null) {
-			throw new IllegalStateException();
-		} else {
-			this.cachedPlans = Collections.<PlanNode>singletonList(new BulkPartialSolutionPlanNode(this,
-					"PartialSolution ("+this.getOperator().getName()+")", gProps, lProps, initialInput));
-		}
-	}
-	
-	public BulkPartialSolutionPlanNode getCurrentPartialSolutionPlanNode() {
-		if (this.cachedPlans != null) {
-			return (BulkPartialSolutionPlanNode) this.cachedPlans.get(0);
-		} else {
-			throw new IllegalStateException();
-		}
-	}
-	
-	public BulkIterationNode getIterationNode() {
-		return this.iterationNode;
-	}
-	
-	@Override
-	public void computeOutputEstimates(DataStatistics statistics) {
-		copyEstimates(this.iterationNode.getPredecessorNode());
-	}
-	
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Gets the operator (here the {@link PartialSolutionPlaceHolder}) that is represented by this
-	 * optimizer node.
-	 * 
-	 * @return The operator represented by this optimizer node.
-	 */
-	@Override
-	public PartialSolutionPlaceHolder<?> getOperator() {
-		return (PartialSolutionPlaceHolder<?>) super.getOperator();
-	}
-
-	@Override
-	public String getName() {
-		return "Bulk Partial Solution";
-	}
-
-	@Override
-	public void computeUnclosedBranchStack() {
-		if (this.openBranches != null) {
-			return;
-		}
-
-		OptimizerNode inputToIteration = this.iterationNode.getPredecessorNode();
-		
-		addClosedBranches(inputToIteration.closedBranchingNodes);
-		List<UnclosedBranchDescriptor> fromInput = inputToIteration.getBranchesForParent(this.iterationNode.getIncomingConnection());
-		this.openBranches = (fromInput == null || fromInput.isEmpty()) ? Collections.<UnclosedBranchDescriptor>emptyList() : fromInput;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
deleted file mode 100644
index 92076c3..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CoGroupNode.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.operators.CoGroupDescriptor;
-import org.apache.flink.optimizer.operators.CoGroupWithSolutionSetFirstDescriptor;
-import org.apache.flink.optimizer.operators.CoGroupWithSolutionSetSecondDescriptor;
-import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
-
-/**
- * The Optimizer representation of a <i>CoGroup</i> operator.
- */
-public class CoGroupNode extends TwoInputNode {
-	
-	private List<OperatorDescriptorDual> dataProperties;
-	
-	public CoGroupNode(CoGroupOperatorBase<?, ?, ?, ?> operator) {
-		super(operator);
-		this.dataProperties = initializeDataProperties(operator.getCustomPartitioner());
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Gets the operator for this CoGroup node.
-	 * 
-	 * @return The CoGroup operator.
-	 */
-	@Override
-	public CoGroupOperatorBase<?, ?, ?, ?> getOperator() {
-		return (CoGroupOperatorBase<?, ?, ?, ?>) super.getOperator();
-	}
-
-	@Override
-	public String getName() {
-		return "CoGroup";
-	}
-
-	@Override
-	protected List<OperatorDescriptorDual> getPossibleProperties() {
-		return this.dataProperties;
-	}
-	
-	public void makeCoGroupWithSolutionSet(int solutionsetInputIndex) {
-		OperatorDescriptorDual op;
-		if (solutionsetInputIndex == 0) {
-			op = new CoGroupWithSolutionSetFirstDescriptor(keys1, keys2);
-		} else if (solutionsetInputIndex == 1) {
-			op = new CoGroupWithSolutionSetSecondDescriptor(keys1, keys2);
-		} else {
-			throw new IllegalArgumentException();
-		}
-		this.dataProperties = Collections.<OperatorDescriptorDual>singletonList(op);
-	}
-
-	@Override
-	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
-		// for CoGroup, we currently make no reasonable default estimates
-	}
-	
-	private List<OperatorDescriptorDual> initializeDataProperties(Partitioner<?> customPartitioner) {
-		Ordering groupOrder1 = null;
-		Ordering groupOrder2 = null;
-		
-		CoGroupOperatorBase<?, ?, ?, ?> cgc = getOperator();
-		groupOrder1 = cgc.getGroupOrderForInputOne();
-		groupOrder2 = cgc.getGroupOrderForInputTwo();
-			
-		if (groupOrder1 != null && groupOrder1.getNumberOfFields() == 0) {
-			groupOrder1 = null;
-		}
-		if (groupOrder2 != null && groupOrder2.getNumberOfFields() == 0) {
-			groupOrder2 = null;
-		}
-		
-		CoGroupDescriptor descr = new CoGroupDescriptor(this.keys1, this.keys2, groupOrder1, groupOrder2);
-		if (customPartitioner != null) {
-			descr.setCustomPartitioner(customPartitioner);
-		}
-		
-		return Collections.<OperatorDescriptorDual>singletonList(descr);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java
deleted file mode 100644
index 93be1e4..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CollectorMapNode.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.SingleInputOperator;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.operators.CollectorMapDescriptor;
-import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
-
-/**
- * The optimizer's internal representation of a <i>Map</i> operator node.
- */
-public class CollectorMapNode extends SingleInputNode {
-	
-	private final List<OperatorDescriptorSingle> possibleProperties;
-
-	
-	public CollectorMapNode(SingleInputOperator<?, ?, ?> operator) {
-		super(operator);
-		
-		this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new CollectorMapDescriptor());
-	}
-
-	@Override
-	public String getName() {
-		return "Map";
-	}
-
-	@Override
-	protected List<OperatorDescriptorSingle> getPossibleProperties() {
-		return this.possibleProperties;
-	}
-
-	/**
-	 * Computes the estimates for the Map operator. Map takes one value and transforms it into another value.
-	 * The cardinality consequently stays the same.
-	 */
-	@Override
-	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
-		this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java
deleted file mode 100644
index 8de67e8..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/CrossNode.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.base.CrossOperatorBase;
-import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.operators.CrossBlockOuterFirstDescriptor;
-import org.apache.flink.optimizer.operators.CrossBlockOuterSecondDescriptor;
-import org.apache.flink.optimizer.operators.CrossStreamOuterFirstDescriptor;
-import org.apache.flink.optimizer.operators.CrossStreamOuterSecondDescriptor;
-import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * The Optimizer representation of a <i>Cross</i> (Cartesian product) operator.
- */
-public class CrossNode extends TwoInputNode {
-	
-	private final List<OperatorDescriptorDual> dataProperties;
-	
-	/**
-	 * Creates a new CrossNode for the given operator.
-	 * 
-	 * @param operation The Cross operator object.
-	 */
-	public CrossNode(CrossOperatorBase<?, ?, ?, ?> operation) {
-		super(operation);
-		
-		Configuration conf = operation.getParameters();
-		String localStrategy = conf.getString(Optimizer.HINT_LOCAL_STRATEGY, null);
-	
-		CrossHint hint = operation.getCrossHint();
-		
-		if (localStrategy != null) {
-			
-			final boolean allowBCfirst = hint != CrossHint.SECOND_IS_SMALL;
-			final boolean allowBCsecond = hint != CrossHint.FIRST_IS_SMALL;
-			
-			final OperatorDescriptorDual fixedDriverStrat;
-			if (Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST.equals(localStrategy)) {
-				fixedDriverStrat = new CrossBlockOuterFirstDescriptor(allowBCfirst, allowBCsecond);
-			} else if (Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND.equals(localStrategy)) {
-				fixedDriverStrat = new CrossBlockOuterSecondDescriptor(allowBCfirst, allowBCsecond);
-			} else if (Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST.equals(localStrategy)) {
-				fixedDriverStrat = new CrossStreamOuterFirstDescriptor(allowBCfirst, allowBCsecond);
-			} else if (Optimizer.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND.equals(localStrategy)) {
-				fixedDriverStrat = new CrossStreamOuterSecondDescriptor(allowBCfirst, allowBCsecond);
-			} else {
-				throw new CompilerException("Invalid local strategy hint for cross contract: " + localStrategy);
-			}
-			
-			this.dataProperties = Collections.singletonList(fixedDriverStrat);
-		}
-		else if (hint == CrossHint.SECOND_IS_SMALL) {
-			ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
-			list.add(new CrossBlockOuterSecondDescriptor(false, true));
-			list.add(new CrossStreamOuterFirstDescriptor(false, true));
-			this.dataProperties = list;
-		}
-		else if (hint == CrossHint.FIRST_IS_SMALL) {
-			ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
-			list.add(new CrossBlockOuterFirstDescriptor(true, false));
-			list.add(new CrossStreamOuterSecondDescriptor(true, false));
-			this.dataProperties = list;
-		}
-		else {
-			ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
-			list.add(new CrossBlockOuterFirstDescriptor());
-			list.add(new CrossBlockOuterSecondDescriptor());
-			list.add(new CrossStreamOuterFirstDescriptor());
-			list.add(new CrossStreamOuterSecondDescriptor());
-			this.dataProperties = list;
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public CrossOperatorBase<?, ?, ?, ?> getOperator() {
-		return (CrossOperatorBase<?, ?, ?, ?>) super.getOperator();
-	}
-
-	@Override
-	public String getName() {
-		return "Cross";
-	}
-	
-	@Override
-	protected List<OperatorDescriptorDual> getPossibleProperties() {
-		return this.dataProperties;
-	}
-
-	/**
-	 * We assume that the cardinality is the product of  the input cardinalities
-	 * and that the result width is the sum of the input widths.
-	 * 
-	 * @param statistics The statistics object to optionally access.
-	 */
-	@Override
-	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
-		long card1 = getFirstPredecessorNode().getEstimatedNumRecords();
-		long card2 = getSecondPredecessorNode().getEstimatedNumRecords();
-		this.estimatedNumRecords = (card1 < 0 || card2 < 0) ? -1 : card1 * card2;
-		
-		if (this.estimatedNumRecords >= 0) {
-			float width1 = getFirstPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
-			float width2 = getSecondPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
-			float width = (width1 <= 0 || width2 <= 0) ? -1 : width1 + width2;
-			
-			if (width > 0) {
-				this.estimatedOutputSize = (long) (width * this.estimatedNumRecords);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java
deleted file mode 100644
index 4e65976..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dag/DagConnection.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.dag;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.optimizer.dataproperties.InterestingProperties;
-import org.apache.flink.optimizer.plandump.DumpableConnection;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-
-/**
- * A connection between to operators. Represents an intermediate result
- * and a data exchange between the two operators.
- *
- * The data exchange has a mode in which it performs (batch / pipelined).
- *
- * The data exchange strategy may be set on this connection, in which case
- * it is fixed and will not be determined during candidate plan enumeration.
- *
- * During the enumeration of interesting properties, this connection also holds
- * all interesting properties generated by the successor operator.
- */
-public class DagConnection implements EstimateProvider, DumpableConnection<OptimizerNode> {
-	
-	private final OptimizerNode source; // The source node of the connection
-
-	private final OptimizerNode target; // The target node of the connection.
-
-	private final ExecutionMode dataExchangeMode; // defines whether to use batch or pipelined data exchange
-
-	private InterestingProperties interestingProps; // local properties that succeeding nodes are interested in
-
-	private ShipStrategyType shipStrategy; // The data shipping strategy, if predefined.
-	
-	private TempMode materializationMode = TempMode.NONE; // the materialization mode
-	
-	private int maxDepth = -1;
-
-	private boolean breakPipeline;  // whether this connection should break the pipeline due to potential deadlocks
-
-	/**
-	 * Creates a new Connection between two nodes. The shipping strategy is by default <tt>NONE</tt>.
-	 * The temp mode is by default <tt>NONE</tt>.
-	 * 
-	 * @param source
-	 *        The source node.
-	 * @param target
-	 *        The target node.
-	 */
-	public DagConnection(OptimizerNode source, OptimizerNode target, ExecutionMode exchangeMode) {
-		this(source, target, null, exchangeMode);
-	}
-
-	/**
-	 * Creates a new Connection between two nodes.
-	 * 
-	 * @param source
-	 *        The source node.
-	 * @param target
-	 *        The target node.
-	 * @param shipStrategy
-	 *        The shipping strategy.
-	 * @param exchangeMode
-	 *        The data exchange mode (pipelined / batch / batch only for shuffles / ... )
-	 */
-	public DagConnection(OptimizerNode source, OptimizerNode target,
-							ShipStrategyType shipStrategy, ExecutionMode exchangeMode)
-	{
-		if (source == null || target == null) {
-			throw new NullPointerException("Source and target must not be null.");
-		}
-		this.source = source;
-		this.target = target;
-		this.shipStrategy = shipStrategy;
-		this.dataExchangeMode = exchangeMode;
-	}
-	
-	/**
-	 * Constructor to create a result from an operator that is not
-	 * consumed by another operator.
-	 * 
-	 * @param source
-	 *        The source node.
-	 */
-	public DagConnection(OptimizerNode source, ExecutionMode exchangeMode) {
-		if (source == null) {
-			throw new NullPointerException("Source and target must not be null.");
-		}
-		this.source = source;
-		this.target = null;
-		this.shipStrategy = ShipStrategyType.NONE;
-		this.dataExchangeMode = exchangeMode;
-	}
-
-	/**
-	 * Gets the source of the connection.
-	 * 
-	 * @return The source Node.
-	 */
-	public OptimizerNode getSource() {
-		return this.source;
-	}
-
-	/**
-	 * Gets the target of the connection.
-	 * 
-	 * @return The target node.
-	 */
-	public OptimizerNode getTarget() {
-		return this.target;
-	}
-
-	/**
-	 * Gets the shipping strategy for this connection.
-	 * 
-	 * @return The connection's shipping strategy.
-	 */
-	public ShipStrategyType getShipStrategy() {
-		return this.shipStrategy;
-	}
-
-	/**
-	 * Sets the shipping strategy for this connection.
-	 * 
-	 * @param strategy
-	 *        The shipping strategy to be applied to this connection.
-	 */
-	public void setShipStrategy(ShipStrategyType strategy) {
-		this.shipStrategy = strategy;
-	}
-
-	/**
-	 * Gets the data exchange mode to use for this connection.
-	 *
-	 * @return The data exchange mode to use for this connection.
-	 */
-	public ExecutionMode getDataExchangeMode() {
-		if (dataExchangeMode == null) {
-			throw new IllegalStateException("This connection does not have the data exchange mode set");
-		}
-		return dataExchangeMode;
-	}
-
-	/**
-	 * Marks that this connection should do a decoupled data exchange (such as batched)
-	 * rather then pipeline data. Connections are marked as pipeline breakers to avoid
-	 * deadlock situations.
-	 */
-	public void markBreaksPipeline() {
-		this.breakPipeline = true;
-	}
-
-	/**
-	 * Checks whether this connection is marked to break the pipeline.
-	 *
-	 * @return True, if this connection is marked to break the pipeline, false otherwise.
-	 */
-	public boolean isBreakingPipeline() {
-		return this.breakPipeline;
-	}
-
-	/**
-	 * Gets the interesting properties object for this pact connection.
-	 * If the interesting properties for this connections have not yet been set,
-	 * this method returns null.
-	 * 
-	 * @return The collection of all interesting properties, or null, if they have not yet been set.
-	 */
-	public InterestingProperties getInterestingProperties() {
-		return this.interestingProps;
-	}
-
-	/**
-	 * Sets the interesting properties for this pact connection.
-	 * 
-	 * @param props The interesting properties.
-	 */
-	public void setInterestingProperties(InterestingProperties props) {
-		if (this.interestingProps == null) {
-			this.interestingProps = props;
-		} else {
-			throw new IllegalStateException("Interesting Properties have already been set.");
-		}
-	}
-	
-	public void clearInterestingProperties() {
-		this.interestingProps = null;
-	}
-	
-	public void initMaxDepth() {
-		
-		if (this.maxDepth == -1) {
-			this.maxDepth = this.source.getMaxDepth() + 1;
-		} else {
-			throw new IllegalStateException("Maximum path depth has already been initialized.");
-		}
-	}
-	
-	public int getMaxDepth() {
-		if (this.maxDepth != -1) {
-			return this.maxDepth;
-		} else {
-			throw new IllegalStateException("Maximum path depth has not been initialized.");
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Estimates
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public long getEstimatedOutputSize() {
-		return this.source.getEstimatedOutputSize();
-	}
-
-	@Override
-	public long getEstimatedNumRecords() {
-		return this.source.getEstimatedNumRecords();
-	}
-	
-	@Override
-	public float getEstimatedAvgWidthPerOutputRecord() {
-		return this.source.getEstimatedAvgWidthPerOutputRecord();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-
-	
-	public TempMode getMaterializationMode() {
-		return this.materializationMode;
-	}
-	
-	public void setMaterializationMode(TempMode materializationMode) {
-		this.materializationMode = materializationMode;
-	}
-	
-	public boolean isOnDynamicPath() {
-		return this.source.isOnDynamicPath();
-	}
-	
-	public int getCostWeight() {
-		return this.source.getCostWeight();
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	public String toString() {
-		StringBuilder buf = new StringBuilder(50);
-		buf.append("Connection: ");
-
-		if (this.source == null) {
-			buf.append("null");
-		} else {
-			buf.append(this.source.getOperator().getName());
-			buf.append('(').append(this.source.getName()).append(')');
-		}
-
-		buf.append(" -> ");
-
-		if (this.shipStrategy != null) {
-			buf.append('[');
-			buf.append(this.shipStrategy.name());
-			buf.append(']').append(' ');
-		}
-
-		if (this.target == null) {
-			buf.append("null");
-		} else {
-			buf.append(this.target.getOperator().getName());
-			buf.append('(').append(this.target.getName()).append(')');
-		}
-
-		return buf.toString();
-	}
-}


Mime
View raw message