flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [32/53] [abbrv] flink git commit: [optimizer] Rename optimizer project to "flink-optimizer" (previously flink-compiler)
Date Fri, 20 Mar 2015 10:07:11 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
deleted file mode 100644
index 6f634fb..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
+++ /dev/null
@@ -1,573 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.plan;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.costs.Costs;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.optimizer.dag.OptimizerNode.UnclosedBranchDescriptor;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.plandump.DumpableConnection;
-import org.apache.flink.optimizer.plandump.DumpableNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.util.Visitable;
-
-/**
- * The representation of a data exchange between to operators. The data exchange can realize a shipping strategy, 
- * which established global properties, and a local strategy, which establishes local properties.
- * <p>
- * Because we currently deal only with plans where the operator order is fixed, many properties are equal
- * among candidates and are determined prior to the enumeration (such as for example constant/dynamic path membership).
- * Hence, many methods will delegate to the {@code OptimizerNode} that represents the node this candidate was
- * created for.
- */
-public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<PlanNode> {
-	
-	protected final OptimizerNode template;
-	
-	protected final List<Channel> outChannels;
-	
-	private List<NamedChannel> broadcastInputs;
-	
-	private final String nodeName; 
-	
-	private DriverStrategy driverStrategy;	// The local strategy (sorting / hashing, ...)
-	
-	protected LocalProperties localProps; 			// local properties of the data produced by this node
-
-	protected GlobalProperties globalProps;			// global properties of the data produced by this node
-	
-	protected Map<OptimizerNode, PlanNode> branchPlan; // the actual plan alternative chosen at a branch point
-	
-	protected Costs nodeCosts;						// the costs incurred by this node
-
-	protected Costs cumulativeCosts;					// the cumulative costs of all operators in the sub-tree
-	
-	private double relativeMemoryPerSubTask;					// the amount of memory dedicated to each task, in bytes
-	
-	private int parallelism;
-	
-	private boolean pFlag;							// flag for the internal pruning algorithm
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public PlanNode(OptimizerNode template, String nodeName, DriverStrategy strategy) {
-		this.outChannels = new ArrayList<Channel>(2);
-		this.broadcastInputs = new ArrayList<NamedChannel>();
-		this.template = template;
-		this.nodeName = nodeName;
-		this.driverStrategy = strategy;
-		
-		this.parallelism = template.getParallelism();
-
-		// check, if there is branch at this node. if yes, this candidate must be associated with
-		// the branching template node.
-		if (template.isBranching()) {
-			this.branchPlan = new HashMap<OptimizerNode, PlanNode>(6);
-			this.branchPlan.put(template, this);
-		}
-	}
-	
-	protected void mergeBranchPlanMaps(PlanNode pred1, PlanNode pred2) {
-		mergeBranchPlanMaps(pred1.branchPlan, pred2.branchPlan);
-	}
-	
-	protected void mergeBranchPlanMaps(Map<OptimizerNode, PlanNode> branchPlan1, Map<OptimizerNode, PlanNode> branchPlan2) {
-		// merge the branchPlan maps according the template's uncloseBranchesStack
-		if (this.template.hasUnclosedBranches()) {
-			if (this.branchPlan == null) {
-				this.branchPlan = new HashMap<OptimizerNode, PlanNode>(8);
-			}
-	
-			for (UnclosedBranchDescriptor uc : this.template.getOpenBranches()) {
-				OptimizerNode brancher = uc.getBranchingNode();
-				PlanNode selectedCandidate = null;
-	
-				if (branchPlan1 != null) {
-					// predecessor 1 has branching children, see if it got the branch we are looking for
-					selectedCandidate = branchPlan1.get(brancher);
-				}
-				
-				if (selectedCandidate == null && branchPlan2 != null) {
-					// predecessor 2 has branching children, see if it got the branch we are looking for
-					selectedCandidate = branchPlan2.get(brancher);
-				}
-				
-				// it may be that the branch candidate is only found once the broadcast variables are set
-				if (selectedCandidate != null) {
-					this.branchPlan.put(brancher, selectedCandidate);
-				}
-			}
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//                                           Accessors
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Gets the node from the optimizer DAG for which this plan candidate node was created.
-	 * 
-	 * @return The optimizer's DAG node.
-	 */
-	public OptimizerNode getOriginalOptimizerNode() {
-		return this.template;
-	}
-	
-	/**
-	 * Gets the program operator that this node represents in the plan.
-	 * 
-	 * @return The program operator this node represents in the plan.
-	 */
-	public Operator<?> getProgramOperator() {
-		return this.template.getOperator();
-	}
-	
-	/**
-	 * Gets the name of the plan node.
-	 * 
-	 * @return The name of the plan node.
-	 */
-	public String getNodeName() {
-		return this.nodeName;
-	}
-	
-	public int getMemoryConsumerWeight() {
-		return this.driverStrategy.isMaterializing() ? 1 : 0;
-	}
-	
-	/**
-	 * Gets the memory dedicated to each sub-task for this node.
-	 * 
-	 * @return The memory per task, in bytes.
-	 */
-	public double getRelativeMemoryPerSubTask() {
-		return this.relativeMemoryPerSubTask;
-	}
-
-	/**
-	 * Sets the memory dedicated to each task for this node.
-	 * 
-	 * @param relativeMemoryPerSubtask The relative memory per sub-task
-	 */
-	public void setRelativeMemoryPerSubtask(double relativeMemoryPerSubtask) {
-		this.relativeMemoryPerSubTask = relativeMemoryPerSubtask;
-	}
-	
-	/**
-	 * Gets the driver strategy from this node. This determines for example for a <i>match</i> Pact whether
-	 * to use a merge or a hybrid hash strategy.
-	 * 
-	 * @return The driver strategy.
-	 */
-	public DriverStrategy getDriverStrategy() {
-		return this.driverStrategy;
-	}
-	
-	/**
-	 * Sets the driver strategy for this node. Usually should not be changed.
-	 * 
-	 * @param newDriverStrategy The driver strategy.
-	 */
-	public void setDriverStrategy(DriverStrategy newDriverStrategy) {
-		this.driverStrategy = newDriverStrategy;
-	}
-	
-	public void initProperties(GlobalProperties globals, LocalProperties locals) {
-		if (this.globalProps != null || this.localProps != null) {
-			throw new IllegalStateException();
-		}
-		this.globalProps = globals;
-		this.localProps = locals;
-	}
-	
-	/**
-	 * Gets the local properties from this PlanNode.
-	 *
-	 * @return The local properties.
-	 */
-	public LocalProperties getLocalProperties() {
-		return this.localProps;
-	}
-	
-	/**
-	 * Gets the global properties from this PlanNode.
-	 *
-	 * @return The global properties.
-	 */
-	public GlobalProperties getGlobalProperties() {
-		return this.globalProps;
-	}
-	
-	/**
-	 * Gets the costs incurred by this node. The costs reflect also the costs incurred by the shipping strategies
-	 * of the incoming connections.
-	 * 
-	 * @return The node-costs, or null, if not yet set.
-	 */
-	public Costs getNodeCosts() {
-		return this.nodeCosts;
-	}
-
-	/**
-	 * Gets the cumulative costs of this nose. The cumulative costs are the sum of the costs
-	 * of this node and of all nodes in the subtree below this node.
-	 * 
-	 * @return The cumulative costs, or null, if not yet set.
-	 */
-	public Costs getCumulativeCosts() {
-		return this.cumulativeCosts;
-	}
-
-	public Costs getCumulativeCostsShare() {
-		if (this.cumulativeCosts == null) {
-			return null;
-		} else {
-			Costs result = cumulativeCosts.clone();
-			if (this.template.getOutgoingConnections() != null) {
-				int outDegree = this.template.getOutgoingConnections().size();
-				if (outDegree > 0) {
-					result.divideBy(outDegree);
-				}
-			}
-
-			return result;
-		}
-	}
-
-	
-	/**
-	 * Sets the basic cost for this node to the given value, and sets the cumulative costs
-	 * to those costs plus the cost shares of all inputs (regular and broadcast).
-	 * 
-	 * @param nodeCosts	 The already knows costs for this node
-	 * 						(this cost a produces by a concrete {@code OptimizerNode} subclass.
-	 */
-	public void setCosts(Costs nodeCosts) {
-		// set the node costs
-		this.nodeCosts = nodeCosts;
-		
-		// the cumulative costs are the node costs plus the costs of all inputs
-		this.cumulativeCosts = nodeCosts.clone();
-		
-		// add all the normal inputs
-		for (PlanNode pred : getPredecessors()) {
-			
-			Costs parentCosts = pred.getCumulativeCostsShare();
-			if (parentCosts != null) {
-				this.cumulativeCosts.addCosts(parentCosts);
-			} else {
-				throw new CompilerException("Trying to set the costs of an operator before the predecessor costs are computed.");
-			}
-		}
-		
-		// add all broadcast variable inputs
-		if (this.broadcastInputs != null) {
-			for (NamedChannel nc : this.broadcastInputs) {
-				Costs bcInputCost = nc.getSource().getCumulativeCostsShare();
-				if (bcInputCost != null) {
-					this.cumulativeCosts.addCosts(bcInputCost);
-				} else {
-					throw new CompilerException("Trying to set the costs of an operator before the broadcast input costs are computed.");
-				}
-			}
-		}
-	}
-	
-	public void setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-	}
-	
-	public int getParallelism() {
-		return this.parallelism;
-	}
-	
-	public long getGuaranteedAvailableMemory() {
-		return this.template.getMinimalMemoryAcrossAllSubTasks();
-	}
-
-	public Map<OptimizerNode, PlanNode> getBranchPlan() {
-		return branchPlan;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//                               Input, Predecessors, Successors
-	// --------------------------------------------------------------------------------------------
-	
-	public abstract Iterable<Channel> getInputs();
-	
-	@Override
-	public abstract Iterable<PlanNode> getPredecessors();
-	
-	/**
-	 * Sets a list of all broadcast inputs attached to this node.
-	 */
-	public void setBroadcastInputs(List<NamedChannel> broadcastInputs) {
-		if (broadcastInputs != null) {
-			this.broadcastInputs = broadcastInputs;
-			
-			// update the branch map
-			for (NamedChannel nc : broadcastInputs) {
-				PlanNode source = nc.getSource();
-				
-				mergeBranchPlanMaps(branchPlan, source.branchPlan);
-			}
-		}
-		
-		// do a sanity check that if we are branching, we have now candidates for each branch point
-		if (this.template.hasUnclosedBranches()) {
-			if (this.branchPlan == null) {
-				throw new CompilerException("Branching and rejoining logic did not find a candidate for the branching point.");
-			}
-	
-			for (UnclosedBranchDescriptor uc : this.template.getOpenBranches()) {
-				OptimizerNode brancher = uc.getBranchingNode();
-				if (this.branchPlan.get(brancher) == null) {
-					throw new CompilerException("Branching and rejoining logic did not find a candidate for the branching point.");
-				}
-			}
-		}
-	}
-	
-	/**
-	 * Gets a list of all broadcast inputs attached to this node.
-	 */
-	public List<NamedChannel> getBroadcastInputs() {
-		return this.broadcastInputs;
-	}
-	
-	/**
-	 * Adds a channel to a successor node to this node.
-	 * 
-	 * @param channel The channel to the successor.
-	 */
-	public void addOutgoingChannel(Channel channel) {
-		this.outChannels.add(channel);
-	}
-	
-	/**
-	 * Gets a list of all outgoing channels leading to successors.
-	 * 
-	 * @return A list of all channels leading to successors.
-	 */
-	public List<Channel> getOutgoingChannels() {
-		return this.outChannels;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//                                Miscellaneous
-	// --------------------------------------------------------------------------------------------
-	
-	public void updatePropertiesWithUniqueSets(Set<FieldSet> uniqueFieldCombinations) {
-		if (uniqueFieldCombinations == null || uniqueFieldCombinations.isEmpty()) {
-			return;
-		}
-		for (FieldSet fields : uniqueFieldCombinations) {
-			this.globalProps.addUniqueFieldCombination(fields);
-			this.localProps = this.localProps.addUniqueFields(fields);
-		}
-	}
-
-	public PlanNode getCandidateAtBranchPoint(OptimizerNode branchPoint) {
-		if (branchPlan == null) {
-			return null;
-		} else {
-			return this.branchPlan.get(branchPoint);
-		}
-	}
-	
-	/**
-	 * Sets the pruning marker to true.
-	 */
-	public void setPruningMarker() {
-		this.pFlag = true;
-	}
-	
-	/**
-	 * Checks whether the pruning marker was set.
-	 * 
-	 * @return True, if the pruning marker was set, false otherwise.
-	 */
-	public boolean isPruneMarkerSet() {
-		return this.pFlag;
-	}
-	
-	public boolean isOnDynamicPath() {
-		return this.template.isOnDynamicPath();
-	}
-	
-	public int getCostWeight() {
-		return this.template.getCostWeight();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Checks whether this node has a dam on the way down to the given source node. This method
-	 * returns either that (a) the source node is not found as a (transitive) child of this node,
-	 * (b) the node is found, but no dam is on the path, or (c) the node is found and a dam is on
-	 * the path.
-	 * 
-	 * @param source The node on the path to which the dam is sought.
-	 * @return The result whether the node is found and whether a dam is on the path.
-	 */
-	public abstract SourceAndDamReport hasDamOnPathDownTo(PlanNode source);
-	
-	public FeedbackPropertiesMeetRequirementsReport checkPartialSolutionPropertiesMet(PlanNode partialSolution, GlobalProperties feedbackGlobal, LocalProperties feedbackLocal) {
-		if (this == partialSolution) {
-			return FeedbackPropertiesMeetRequirementsReport.PENDING;
-		}
-		
-		boolean found = false;
-		boolean allMet = true;
-		boolean allLocallyMet = true;
-		
-		for (Channel input : getInputs()) {
-			FeedbackPropertiesMeetRequirementsReport inputState = input.getSource().checkPartialSolutionPropertiesMet(partialSolution, feedbackGlobal, feedbackLocal);
-			
-			if (inputState == FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) {
-				continue;
-			}
-			else if (inputState == FeedbackPropertiesMeetRequirementsReport.MET) {
-				found = true;
-				continue;
-			}
-			else if (inputState == FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
-				return FeedbackPropertiesMeetRequirementsReport.NOT_MET;
-			}
-			else {
-				found = true;
-				
-				// the partial solution was on the path here. check whether the channel requires
-				// certain properties that are met, or whether the channel introduces new properties
-				
-				// if the plan introduces new global properties, then we can stop looking whether
-				// the feedback properties are sufficient to meet the requirements
-				if (input.getShipStrategy() != ShipStrategyType.FORWARD && input.getShipStrategy() != ShipStrategyType.NONE) {
-					continue;
-				}
-				
-				// first check whether this channel requires something that is not met
-				if (input.getRequiredGlobalProps() != null && !input.getRequiredGlobalProps().isMetBy(feedbackGlobal)) {
-					return FeedbackPropertiesMeetRequirementsReport.NOT_MET;
-				}
-				
-				// in general, not everything is met here already
-				allMet = false;
-				
-				// if the plan introduces new local properties, we can stop checking for matching local properties
-				if (inputState != FeedbackPropertiesMeetRequirementsReport.PENDING_LOCAL_MET) {
-					
-					if (input.getLocalStrategy() == LocalStrategy.NONE) {
-						
-						if (input.getRequiredLocalProps() != null && !input.getRequiredLocalProps().isMetBy(feedbackLocal)) {
-							return FeedbackPropertiesMeetRequirementsReport.NOT_MET;
-						}
-						
-						allLocallyMet = false;
-					}
-				}
-			}
-		}
-		
-		if (!found) {
-			return FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION;
-		} else if (allMet) {
-			return FeedbackPropertiesMeetRequirementsReport.MET;
-		} else if (allLocallyMet) {
-			return FeedbackPropertiesMeetRequirementsReport.PENDING_LOCAL_MET;
-		} else {
-			return FeedbackPropertiesMeetRequirementsReport.PENDING;
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-
-	@Override
-	public String toString() {
-		return this.template.getName() + " \"" + getProgramOperator().getName() + "\" : " + this.driverStrategy +
-				" [[ " + this.globalProps + " ]] [[ " + this.localProps + " ]]";
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public OptimizerNode getOptimizerNode() {
-		return this.template;
-	}
-
-	@Override
-	public PlanNode getPlanNode() {
-		return this;
-	}
-
-	@Override
-	public Iterable<DumpableConnection<PlanNode>> getDumpableInputs() {
-		List<DumpableConnection<PlanNode>> allInputs = new ArrayList<DumpableConnection<PlanNode>>();
-		
-		for (Channel c : getInputs()) {
-			allInputs.add(c);
-		}
-		
-		for (NamedChannel c : getBroadcastInputs()) {
-			allInputs.add(c);
-		}
-		
-		return allInputs;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public static enum SourceAndDamReport {
-		NOT_FOUND, FOUND_SOURCE, FOUND_SOURCE_AND_DAM;
-	}
-	
-	
-	
-	public static enum FeedbackPropertiesMeetRequirementsReport {
-		/** Indicates that the path is irrelevant */
-		NO_PARTIAL_SOLUTION,
-		
-		/** Indicates that the question whether the properties are met has been determined pending
-		 * dependent on global and local properties */
-		PENDING,
-		
-		/** Indicates that the question whether the properties are met has been determined pending
-		 * dependent on global properties only */
-		PENDING_LOCAL_MET,
-		
-		/** Indicates that the question whether the properties are met has been determined true */
-		MET,
-		
-		/** Indicates that the question whether the properties are met has been determined false */
-		NOT_MET;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java
deleted file mode 100644
index b928be7..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.plan;
-
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.optimizer.dag.SingleInputNode;
-import org.apache.flink.runtime.operators.DamBehavior;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.util.Visitor;
-
-/**
- * 
- */
-public class SingleInputPlanNode extends PlanNode {
-	
-	protected final Channel input;
-	
-	protected final FieldList[] driverKeys;
-	
-	protected final boolean[][] driverSortOrders;
-	
-	private TypeComparatorFactory<?>[] comparators;
-	
-	public Object postPassHelper;
-	
-	// --------------------------------------------------------------------------------------------
-
-	public SingleInputPlanNode(OptimizerNode template, String nodeName, Channel input, DriverStrategy driverStrategy) {
-		this(template, nodeName, input, driverStrategy, null, null);
-	}
-	
-	public SingleInputPlanNode(OptimizerNode template, String nodeName, Channel input, 
-			DriverStrategy driverStrategy, FieldList driverKeyFields)
-	{
-		this(template, nodeName, input, driverStrategy, driverKeyFields, getTrueArray(driverKeyFields.size()));
-	}
-	
-	public SingleInputPlanNode(OptimizerNode template, String nodeName, Channel input, 
-			DriverStrategy driverStrategy, FieldList driverKeyFields, boolean[] driverSortOrders)
-	{
-		super(template, nodeName, driverStrategy);
-		this.input = input;
-		
-		this.comparators = new TypeComparatorFactory<?>[driverStrategy.getNumRequiredComparators()];
-		this.driverKeys = new FieldList[driverStrategy.getNumRequiredComparators()];
-		this.driverSortOrders = new boolean[driverStrategy.getNumRequiredComparators()][];
-		
-		if(driverStrategy.getNumRequiredComparators() > 0) {
-			this.driverKeys[0] = driverKeyFields;
-			this.driverSortOrders[0] = driverSortOrders;
-		}
-		
-		if (this.input.getShipStrategy() == ShipStrategyType.BROADCAST) {
-			this.input.setReplicationFactor(getParallelism());
-		}
-		
-		final PlanNode predNode = input.getSource();
-		
-		if (predNode.branchPlan != null && !predNode.branchPlan.isEmpty()) {
-			
-			if (this.branchPlan == null) {
-				this.branchPlan = new HashMap<OptimizerNode, PlanNode>();
-			}
-			this.branchPlan.putAll(predNode.branchPlan);
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	public SingleInputNode getSingleInputNode() {
-		if (this.template instanceof SingleInputNode) {
-			return (SingleInputNode) this.template;
-		} else {
-			throw new RuntimeException();
-		}
-	}
-	
-	/**
-	 * Gets the input channel to this node.
-	 * 
-	 * @return The input channel to this node.
-	 */
-	public Channel getInput() {
-		return this.input;
-	}
-	
-	/**
-	 * Gets the predecessor of this node, i.e. the source of the input channel.
-	 * 
-	 * @return The predecessor of this node.
-	 */
-	public PlanNode getPredecessor() {
-		return this.input.getSource();
-	}
-	
-	/**
-	 * Sets the key field indexes for the specified driver comparator.
-	 * 
-	 * @param keys The key field indexes for the specified driver comparator.
-	 * @param id The ID of the driver comparator.
-	 */
-	public void setDriverKeyInfo(FieldList keys, int id) {
-		this.setDriverKeyInfo(keys, getTrueArray(keys.size()), id);
-	}
-	
-	/**
-	 * Sets the key field information for the specified driver comparator.
-	 * 
-	 * @param keys The key field indexes for the specified driver comparator.
-	 * @param sortOrder The key sort order for the specified driver comparator.
-	 * @param id The ID of the driver comparator.
-	 */
-	public void setDriverKeyInfo(FieldList keys, boolean[] sortOrder, int id) {
-		if(id < 0 || id >= driverKeys.length) {
-			throw new CompilerException("Invalid id for driver key information. DriverStrategy requires only "
-											+super.getDriverStrategy().getNumRequiredComparators()+" comparators.");
-		}
-		this.driverKeys[id] = keys;
-		this.driverSortOrders[id] = sortOrder;
-	}
-	
-	/**
-	 * Gets the key field indexes for the specified driver comparator.
-	 * 
-	 * @param id The id of the driver comparator for which the key field indexes are requested.
-	 * @return The key field indexes of the specified driver comparator.
-	 */
-	public FieldList getKeys(int id) {
-		return this.driverKeys[id];
-	}
-	
-	/**
-	 * Gets the sort order for the specified driver comparator.
-	 * 
-	 * @param id The id of the driver comparator for which the sort order is requested.
-	 * @return The sort order of the specified driver comparator.
-	 */
-	public boolean[] getSortOrders(int id) {
-		return driverSortOrders[id];
-	}
-	
-	/**
-	 * Gets the specified comparator from this PlanNode.
-	 * 
-	 * @param id The ID of the requested comparator.
-	 *
-	 * @return The specified comparator.
-	 */
-	public TypeComparatorFactory<?> getComparator(int id) {
-		return comparators[id];
-	}
-	
-	/**
-	 * Sets the specified comparator for this PlanNode.
-	 *
-	 * @param comparator The comparator to set.
-	 * @param id The ID of the comparator to set.
-	 */
-	public void setComparator(TypeComparatorFactory<?> comparator, int id) {
-		this.comparators[id] = comparator;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-
-	@Override
-	public void accept(Visitor<PlanNode> visitor) {
-		if (visitor.preVisit(this)) {
-			this.input.getSource().accept(visitor);
-			
-			for (Channel broadcastInput : getBroadcastInputs()) {
-				broadcastInput.getSource().accept(visitor);
-			}
-			
-			visitor.postVisit(this);
-		}
-	}
-
-
-	@Override
-	public Iterable<PlanNode> getPredecessors() {
-		if (getBroadcastInputs() == null || getBroadcastInputs().isEmpty()) {
-			return Collections.singleton(this.input.getSource());
-		}
-		else {
-			List<PlanNode> preds = new ArrayList<PlanNode>();
-			preds.add(input.getSource());
-			
-			for (Channel c : getBroadcastInputs()) {
-				preds.add(c.getSource());
-			}
-			
-			return preds;
-		}
-	}
-
-
-	@Override
-	public Iterable<Channel> getInputs() {
-		return Collections.singleton(this.input);
-	}
-
-	@Override
-	public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
-		if (source == this) {
-			return FOUND_SOURCE;
-		}
-		SourceAndDamReport res = this.input.getSource().hasDamOnPathDownTo(source);
-		if (res == FOUND_SOURCE_AND_DAM) {
-			return FOUND_SOURCE_AND_DAM;
-		}
-		else if (res == FOUND_SOURCE) {
-			return (this.input.getLocalStrategy().dams() || this.input.getTempMode().breaksPipeline() ||
-					getDriverStrategy().firstDam() == DamBehavior.FULL_DAM) ?
-				FOUND_SOURCE_AND_DAM : FOUND_SOURCE;
-		}
-		else {
-			// NOT_FOUND
-			// check the broadcast inputs
-			
-			for (NamedChannel nc : getBroadcastInputs()) {
-				SourceAndDamReport bcRes = nc.getSource().hasDamOnPathDownTo(source);
-				if (bcRes != NOT_FOUND) {
-					// broadcast inputs are always dams
-					return FOUND_SOURCE_AND_DAM;
-				}
-			}
-			return NOT_FOUND;
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	protected static boolean[] getTrueArray(int length) {
-		final boolean[] a = new boolean[length];
-		for (int i = 0; i < length; i++) {
-			a[i] = true;
-		}
-		return a;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java
deleted file mode 100644
index 451484d..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SinkJoinerPlanNode.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.plan;
-
-import java.util.List;
-
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.costs.Costs;
-import org.apache.flink.optimizer.dag.SinkJoiner;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-
-/**
- *
- */
-public class SinkJoinerPlanNode extends DualInputPlanNode {
-	
-	public SinkJoinerPlanNode(SinkJoiner template, Channel input1, Channel input2) {
-		super(template, "", input1, input2, DriverStrategy.BINARY_NO_OP);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public void setCosts(Costs nodeCosts) {
-		// the plan enumeration logic works as for regular two-input-operators, which is important
-		// because of the branch handling logic. it does pick redistributing network channels
-		// between the sink and the sink joiner, because sinks joiner has a different DOP than the sink.
-		// we discard any cost and simply use the sum of the costs from the two children.
-		
-		Costs totalCosts = getInput1().getSource().getCumulativeCosts().clone();
-		totalCosts.addCosts(getInput2().getSource().getCumulativeCosts());
-		super.setCosts(totalCosts);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public void getDataSinks(List<SinkPlanNode> sinks) {
-		final PlanNode in1 = this.input1.getSource();
-		final PlanNode in2 = this.input2.getSource();
-		
-		if (in1 instanceof SinkPlanNode) {
-			sinks.add((SinkPlanNode) in1);
-		} else if (in1 instanceof SinkJoinerPlanNode) {
-			((SinkJoinerPlanNode) in1).getDataSinks(sinks);
-		} else {
-			throw new CompilerException("Illegal child node for a sink joiner utility node: Neither Sink nor Sink Joiner");
-		}
-		
-		if (in2 instanceof SinkPlanNode) {
-			sinks.add((SinkPlanNode) in2);
-		} else if (in2 instanceof SinkJoinerPlanNode) {
-			((SinkJoinerPlanNode) in2).getDataSinks(sinks);
-		} else {
-			throw new CompilerException("Illegal child node for a sink joiner utility node: Neither Sink nor Sink Joiner");
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java
deleted file mode 100644
index 656e67f..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SinkPlanNode.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.plan;
-
-import org.apache.flink.optimizer.dag.DataSinkNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-/**
- * Plan candidate node for data flow sinks.
- */
-public class SinkPlanNode extends SingleInputPlanNode
-{
-	/**
-	 * Constructs a new sink candidate node that uses <i>NONE</i> as its local strategy. Note that
-	 * local sorting and range partitioning are handled by the incoming channel already.
-	 * 
-	 * @param template The template optimizer node that this candidate is created for.
-	 */
-	public SinkPlanNode(DataSinkNode template, String nodeName, Channel input) {
-		super(template, nodeName, input, DriverStrategy.NONE);
-		
-		this.globalProps = input.getGlobalProperties().clone();
-		this.localProps = input.getLocalProperties().clone();
-	}
-	
-	public DataSinkNode getSinkNode() {
-		if (this.template instanceof DataSinkNode) {
-			return (DataSinkNode) this.template;
-		} else {
-			throw new RuntimeException();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SolutionSetPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SolutionSetPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SolutionSetPlanNode.java
deleted file mode 100644
index 63093dd..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SolutionSetPlanNode.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.plan;
-
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
-
-import java.util.Collections;
-import java.util.HashMap;
-
-import org.apache.flink.optimizer.costs.Costs;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.optimizer.dag.SolutionSetNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.util.Visitor;
-
-/**
- * Plan candidate node for partial solution of a bulk iteration.
- */
-public class SolutionSetPlanNode extends PlanNode {
-	
-	private static final Costs NO_COSTS = new Costs();
-	
-	private WorksetIterationPlanNode containingIterationNode;
-	
-	private final Channel initialInput;
-	
-	public Object postPassHelper;
-	
-	
-	public SolutionSetPlanNode(SolutionSetNode template, String nodeName,
-			GlobalProperties gProps, LocalProperties lProps,
-			Channel initialInput)
-	{
-		super(template, nodeName, DriverStrategy.NONE);
-		
-		this.globalProps = gProps;
-		this.localProps = lProps;
-		this.initialInput = initialInput;
-		
-		// the node incurs no cost
-		this.nodeCosts = NO_COSTS;
-		this.cumulativeCosts = NO_COSTS;
-		
-		if (initialInput.getSource().branchPlan != null && initialInput.getSource().branchPlan.size() > 0) {
-			if (this.branchPlan == null) {
-				this.branchPlan = new HashMap<OptimizerNode, PlanNode>();
-			}
-			
-			this.branchPlan.putAll(initialInput.getSource().branchPlan);
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public SolutionSetNode getSolutionSetNode() {
-		return (SolutionSetNode) this.template;
-	}
-	
-	public WorksetIterationPlanNode getContainingIterationNode() {
-		return this.containingIterationNode;
-	}
-	
-	public void setContainingIterationNode(WorksetIterationPlanNode containingIterationNode) {
-		this.containingIterationNode = containingIterationNode;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-
-	@Override
-	public void accept(Visitor<PlanNode> visitor) {
-		if (visitor.preVisit(this)) {
-			visitor.postVisit(this);
-		}
-	}
-
-
-	@Override
-	public Iterable<PlanNode> getPredecessors() {
-		return Collections.<PlanNode>emptyList();
-	}
-
-
-	@Override
-	public Iterable<Channel> getInputs() {
-		return Collections.<Channel>emptyList();
-	}
-
-
-	@Override
-	public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
-		if (source == this) {
-			return FOUND_SOURCE_AND_DAM;
-		}
-		
-		SourceAndDamReport res = this.initialInput.getSource().hasDamOnPathDownTo(source);
-		if (res == FOUND_SOURCE_AND_DAM || res == FOUND_SOURCE) {
-			return FOUND_SOURCE_AND_DAM;
-		} else {
-			return NOT_FOUND;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java
deleted file mode 100644
index 11b7cc9..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SourcePlanNode.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.plan;
-
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
-
-import java.util.Collections;
-
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.optimizer.dag.DataSourceNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.util.Visitor;
-
-/**
- * Plan candidate node for data flow sources that have no input and no special strategies.
- */
-public class SourcePlanNode extends PlanNode {
-	
-	private TypeSerializerFactory<?> serializer;
-	
-	/**
-	 * Constructs a new source candidate node that uses <i>NONE</i> as its local strategy.
-	 * 
-	 * @param template The template optimizer node that this candidate is created for.
-	 */
-	public SourcePlanNode(DataSourceNode template, String nodeName) {
-		this(template, nodeName, new GlobalProperties(), new LocalProperties());
-	}
-
-	public SourcePlanNode(DataSourceNode template, String nodeName, GlobalProperties gprops, LocalProperties lprops) {
-		super(template, nodeName, DriverStrategy.NONE);
-
-		this.globalProps = gprops;
-		this.localProps = lprops;
-		updatePropertiesWithUniqueSets(template.getUniqueFields());
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	public DataSourceNode getDataSourceNode() {
-		return (DataSourceNode) this.template;
-	}
-	
-	/**
-	 * Gets the serializer from this PlanNode.
-	 *
-	 * @return The serializer.
-	 */
-	public TypeSerializerFactory<?> getSerializer() {
-		return serializer;
-	}
-	
-	/**
-	 * Sets the serializer for this PlanNode.
-	 *
-	 * @param serializer The serializer to set.
-	 */
-	public void setSerializer(TypeSerializerFactory<?> serializer) {
-		this.serializer = serializer;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-
-	@Override
-	public void accept(Visitor<PlanNode> visitor) {
-		if (visitor.preVisit(this)) {
-			visitor.postVisit(this);
-		}
-	}
-
-
-	@Override
-	public Iterable<PlanNode> getPredecessors() {
-		return Collections.<PlanNode>emptyList();
-	}
-
-
-	@Override
-	public Iterable<Channel> getInputs() {
-		return Collections.<Channel>emptyList();
-	}
-
-
-	@Override
-	public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
-		if (source == this) {
-			return FOUND_SOURCE;
-		} else {
-			return NOT_FOUND;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java
deleted file mode 100644
index 880f2e3..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/StreamingPlan.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.plan;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.flink.runtime.jobgraph.JobGraph;
-
-/**
- * Abstract class representing Flink Streaming plans
- * 
- */
-public abstract class StreamingPlan implements FlinkPlan {
-
-	public abstract JobGraph getJobGraph();
-
-	public abstract String getStreamingPlanAsJSON();
-
-	public abstract void dumpStreamingPlanAsJSON(File file) throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/WorksetIterationPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/WorksetIterationPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/WorksetIterationPlanNode.java
deleted file mode 100644
index 95adace..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/WorksetIterationPlanNode.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.plan;
-
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.costs.Costs;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.optimizer.dag.WorksetIterationNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.util.Visitor;
-
-/**
- * A node in the execution, representing a workset iteration (delta iteration).
- */
-public class WorksetIterationPlanNode extends DualInputPlanNode implements IterationPlanNode {
-
-	private final SolutionSetPlanNode solutionSetPlanNode;
-	
-	private final WorksetPlanNode worksetPlanNode;
-	
-	private final PlanNode solutionSetDeltaPlanNode;
-	
-	private final PlanNode nextWorkSetPlanNode;
-	
-	private TypeSerializerFactory<?> worksetSerializer;
-	
-	private TypeSerializerFactory<?> solutionSetSerializer;
-	
-	private TypeComparatorFactory<?> solutionSetComparator;
-	
-	private boolean immediateSolutionSetUpdate;
-	
-	public Object postPassHelper;
-	
-	private TypeSerializerFactory<?> serializerForIterationChannel;
-	
-	// --------------------------------------------------------------------------------------------
-
-	public WorksetIterationPlanNode(WorksetIterationNode template, String nodeName, Channel initialSolutionSet, Channel initialWorkset,
-			SolutionSetPlanNode solutionSetPlanNode, WorksetPlanNode worksetPlanNode,
-			PlanNode nextWorkSetPlanNode, PlanNode solutionSetDeltaPlanNode)
-	{
-		super(template, nodeName, initialSolutionSet, initialWorkset, DriverStrategy.BINARY_NO_OP);
-		this.solutionSetPlanNode = solutionSetPlanNode;
-		this.worksetPlanNode = worksetPlanNode;
-		this.solutionSetDeltaPlanNode = solutionSetDeltaPlanNode;
-		this.nextWorkSetPlanNode = nextWorkSetPlanNode;
-		
-		mergeBranchPlanMaps();
-
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	public WorksetIterationNode getIterationNode() {
-		if (this.template instanceof WorksetIterationNode) {
-			return (WorksetIterationNode) this.template;
-		} else {
-			throw new RuntimeException();
-		}
-	}
-	
-	public SolutionSetPlanNode getSolutionSetPlanNode() {
-		return this.solutionSetPlanNode;
-	}
-	
-	public WorksetPlanNode getWorksetPlanNode() {
-		return this.worksetPlanNode;
-	}
-	
-	public PlanNode getSolutionSetDeltaPlanNode() {
-		return this.solutionSetDeltaPlanNode;
-	}
-	
-	public PlanNode getNextWorkSetPlanNode() {
-		return this.nextWorkSetPlanNode;
-	}
-	
-	public Channel getInitialSolutionSetInput() {
-		return getInput1();
-	}
-	
-	public Channel getInitialWorksetInput() {
-		return getInput2();
-	}
-	
-	public void setImmediateSolutionSetUpdate(boolean immediateUpdate) {
-		this.immediateSolutionSetUpdate = immediateUpdate;
-	}
-	
-	public boolean isImmediateSolutionSetUpdate() {
-		return this.immediateSolutionSetUpdate;
-	}
-	
-	public FieldList getSolutionSetKeyFields() {
-		return getIterationNode().getSolutionSetKeyFields();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public TypeSerializerFactory<?> getWorksetSerializer() {
-		return worksetSerializer;
-	}
-	
-	public void setWorksetSerializer(TypeSerializerFactory<?> worksetSerializer) {
-		this.worksetSerializer = worksetSerializer;
-	}
-	
-	public TypeSerializerFactory<?> getSolutionSetSerializer() {
-		return solutionSetSerializer;
-	}
-	
-	public void setSolutionSetSerializer(TypeSerializerFactory<?> solutionSetSerializer) {
-		this.solutionSetSerializer = solutionSetSerializer;
-	}
-	
-	public TypeComparatorFactory<?> getSolutionSetComparator() {
-		return solutionSetComparator;
-	}
-	
-	public void setSolutionSetComparator(TypeComparatorFactory<?> solutionSetComparator) {
-		this.solutionSetComparator = solutionSetComparator;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	public void setCosts(Costs nodeCosts) {
-		// add the costs from the step function
-		nodeCosts.addCosts(this.solutionSetDeltaPlanNode.getCumulativeCostsShare());
-		nodeCosts.addCosts(this.nextWorkSetPlanNode.getCumulativeCostsShare());
-
-		super.setCosts(nodeCosts);
-	}
-	
-	public int getMemoryConsumerWeight() {
-		// solution set index and workset back channel
-		return 2;
-	}
-	
-	@Override
-	public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
-		if (source == this) {
-			return FOUND_SOURCE;
-		}
-		
-		SourceAndDamReport fromOutside = super.hasDamOnPathDownTo(source);
-
-		if (fromOutside == FOUND_SOURCE_AND_DAM) {
-			return FOUND_SOURCE_AND_DAM;
-		}
-		else if (fromOutside == FOUND_SOURCE) {
-			// we always have a dam in the solution set index
-			return FOUND_SOURCE_AND_DAM;
-		} else {
-			SourceAndDamReport fromNextWorkset = nextWorkSetPlanNode.hasDamOnPathDownTo(source);
-
-			if (fromNextWorkset == FOUND_SOURCE_AND_DAM){
-				return FOUND_SOURCE_AND_DAM;
-			} else if (fromNextWorkset == FOUND_SOURCE){
-				return FOUND_SOURCE_AND_DAM;
-			} else {
-				return this.solutionSetDeltaPlanNode.hasDamOnPathDownTo(source);
-			}
-		}
-	}
-
-	@Override
-	public void acceptForStepFunction(Visitor<PlanNode> visitor) {
-		this.solutionSetDeltaPlanNode.accept(visitor);
-		this.nextWorkSetPlanNode.accept(visitor);
-	}
-
-	/**
-	 * Merging can only take place after the solutionSetDelta and nextWorkset PlanNode has been set,
-	 * because they can contain also some of the branching nodes.
-	 */
-	@Override
-	protected void mergeBranchPlanMaps(Map<OptimizerNode, PlanNode> branchPlan1, Map<OptimizerNode,PlanNode> branchPlan2) {}
-
-	
-	protected void mergeBranchPlanMaps() {
-		Map<OptimizerNode, PlanNode> branchPlan1 = input1.getSource().branchPlan;
-		Map<OptimizerNode, PlanNode> branchPlan2 = input2.getSource().branchPlan;
-
-		// merge the branchPlan maps according the template's uncloseBranchesStack
-		if (this.template.hasUnclosedBranches()) {
-			if (this.branchPlan == null) {
-				this.branchPlan = new HashMap<OptimizerNode, PlanNode>(8);
-			}
-
-			for (OptimizerNode.UnclosedBranchDescriptor uc : this.template.getOpenBranches()) {
-				OptimizerNode brancher = uc.getBranchingNode();
-				PlanNode selectedCandidate = null;
-
-				if (branchPlan1 != null) {
-					// predecessor 1 has branching children, see if it got the branch we are looking for
-					selectedCandidate = branchPlan1.get(brancher);
-				}
-
-				if (selectedCandidate == null && branchPlan2 != null) {
-					// predecessor 2 has branching children, see if it got the branch we are looking for
-					selectedCandidate = branchPlan2.get(brancher);
-				}
-
-				if(selectedCandidate == null && getSolutionSetDeltaPlanNode() != null && getSolutionSetDeltaPlanNode()
-						.branchPlan != null){
-					selectedCandidate = getSolutionSetDeltaPlanNode().branchPlan.get(brancher);
-				}
-
-				if(selectedCandidate == null && getNextWorkSetPlanNode() != null && getNextWorkSetPlanNode()
-						.branchPlan != null){
-					selectedCandidate = getNextWorkSetPlanNode().branchPlan.get(brancher);
-				}
-
-				if (selectedCandidate == null) {
-					throw new CompilerException(
-							"Candidates for a node with open branches are missing information about the selected candidate ");
-				}
-
-				this.branchPlan.put(brancher, selectedCandidate);
-			}
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public TypeSerializerFactory<?> getSerializerForIterationChannel() {
-		return serializerForIterationChannel;
-	}
-	
-	public void setSerializerForIterationChannel(TypeSerializerFactory<?> serializerForIterationChannel) {
-		this.serializerForIterationChannel = serializerForIterationChannel;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/WorksetPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/WorksetPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/WorksetPlanNode.java
deleted file mode 100644
index 8d044d6..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/WorksetPlanNode.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.plan;
-
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
-
-import java.util.Collections;
-import java.util.HashMap;
-
-import org.apache.flink.optimizer.costs.Costs;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.optimizer.dag.WorksetNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.runtime.operators.DamBehavior;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.util.Visitor;
-
-/**
- * Plan candidate node for partial solution of a bulk iteration.
- */
-public class WorksetPlanNode extends PlanNode {
-	
-	private static final Costs NO_COSTS = new Costs();
-	
-	private WorksetIterationPlanNode containingIterationNode;
-	
-	private final Channel initialInput;
-	
-	public Object postPassHelper;
-	
-	
-	public WorksetPlanNode(WorksetNode template, String nodeName,
-			GlobalProperties gProps, LocalProperties lProps,
-			Channel initialInput)
-	{
-		super(template, nodeName, DriverStrategy.NONE);
-		
-		this.globalProps = gProps;
-		this.localProps = lProps;
-		this.initialInput = initialInput;
-		
-		// the node incurs no cost
-		this.nodeCosts = NO_COSTS;
-		this.cumulativeCosts = NO_COSTS;
-		
-		if (initialInput.getSource().branchPlan != null && initialInput.getSource().branchPlan.size() > 0) {
-			if (this.branchPlan == null) {
-				this.branchPlan = new HashMap<OptimizerNode, PlanNode>();
-			}
-			
-			this.branchPlan.putAll(initialInput.getSource().branchPlan);
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public WorksetNode getWorksetNode() {
-		return (WorksetNode) this.template;
-	}
-	
-	public WorksetIterationPlanNode getContainingIterationNode() {
-		return this.containingIterationNode;
-	}
-	
-	public void setContainingIterationNode(WorksetIterationPlanNode containingIterationNode) {
-		this.containingIterationNode = containingIterationNode;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-
-	@Override
-	public void accept(Visitor<PlanNode> visitor) {
-		if (visitor.preVisit(this)) {
-			visitor.postVisit(this);
-		}
-	}
-
-
-	@Override
-	public Iterable<PlanNode> getPredecessors() {
-		return Collections.<PlanNode>emptyList();
-	}
-
-
-	@Override
-	public Iterable<Channel> getInputs() {
-		return Collections.<Channel>emptyList();
-	}
-
-
-	@Override
-	public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
-		if (source == this) {
-			return FOUND_SOURCE;
-		}
-		SourceAndDamReport res = this.initialInput.getSource().hasDamOnPathDownTo(source);
-		if (res == FOUND_SOURCE_AND_DAM) {
-			return FOUND_SOURCE_AND_DAM;
-		}
-		else if (res == FOUND_SOURCE) {
-			return (this.initialInput.getLocalStrategy().dams() || 
-					this.initialInput.getTempMode().breaksPipeline() ||
-					getDriverStrategy().firstDam() == DamBehavior.FULL_DAM) ?
-				FOUND_SOURCE_AND_DAM : FOUND_SOURCE;
-		}
-		else {
-			return NOT_FOUND;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/DumpableConnection.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/DumpableConnection.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/DumpableConnection.java
deleted file mode 100644
index 3f8cb46..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/DumpableConnection.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.plandump;
-
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-
-
-/**
- *
- */
-public interface DumpableConnection<T extends DumpableNode<T>> {
-
-	public DumpableNode<T> getSource();
-	
-	public ShipStrategyType getShipStrategy();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/DumpableNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/DumpableNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/DumpableNode.java
deleted file mode 100644
index 1bc0f0c..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/DumpableNode.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.plandump;
-
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.optimizer.plan.PlanNode;
-
-/**
- *
- */
-public interface DumpableNode<T extends DumpableNode<T>> {
-	
-	/**
-	 * Gets an iterator over the predecessors.
-	 * 
-	 * @return An iterator over the predecessors.
-	 */
-	Iterable<T> getPredecessors();
-	
-	Iterable<DumpableConnection<T>> getDumpableInputs();
-	
-	OptimizerNode getOptimizerNode();
-	
-	PlanNode getPlanNode();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
deleted file mode 100644
index 6f918c0..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
+++ /dev/null
@@ -1,657 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.plandump;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.flink.api.common.operators.CompilerHints;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.dag.BinaryUnionNode;
-import org.apache.flink.optimizer.dag.BulkIterationNode;
-import org.apache.flink.optimizer.dag.DataSinkNode;
-import org.apache.flink.optimizer.dag.DataSourceNode;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.optimizer.dag.DagConnection;
-import org.apache.flink.optimizer.dag.TempMode;
-import org.apache.flink.optimizer.dag.WorksetIterationNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.optimizer.util.Utils;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.util.StringUtils;
-
-/**
- * 
- */
-public class PlanJSONDumpGenerator {
-	
-	private Map<DumpableNode<?>, Integer> nodeIds; // resolves pact nodes to ids
-
-	private int nodeCnt;
-	
-	private boolean encodeForHTML;
-
-	// --------------------------------------------------------------------------------------------
-	
-	public void setEncodeForHTML(boolean encodeForHTML) {
-		this.encodeForHTML = encodeForHTML;
-	}
-	
-	public boolean isEncodeForHTML() {
-		return encodeForHTML;
-	}
-	
-	
-	public void dumpPactPlanAsJSON(List<DataSinkNode> nodes, PrintWriter writer) {
-		@SuppressWarnings("unchecked")
-		List<DumpableNode<?>> n = (List<DumpableNode<?>>) (List<?>) nodes;
-		compilePlanToJSON(n, writer);
-	}
-	
-	public String getPactPlanAsJSON(List<DataSinkNode> nodes) {
-		StringWriter sw = new StringWriter();
-		PrintWriter pw = new PrintWriter(sw);
-		dumpPactPlanAsJSON(nodes, pw);
-		return sw.toString();
-	}
-	
-	public void dumpOptimizerPlanAsJSON(OptimizedPlan plan, File toFile) throws IOException {
-		PrintWriter pw = null;
-		try {
-			pw = new PrintWriter(new FileOutputStream(toFile), false);
-			dumpOptimizerPlanAsJSON(plan, pw);
-			pw.flush();
-		} finally {
-			if (pw != null) {
-				pw.close();
-			}
-		}
-	}
-	
-	public String getOptimizerPlanAsJSON(OptimizedPlan plan) {
-		StringWriter sw = new StringWriter();
-		PrintWriter pw = new PrintWriter(sw);
-		dumpOptimizerPlanAsJSON(plan, pw);
-		pw.close();
-		return sw.toString();
-	}
-	
-	public void dumpOptimizerPlanAsJSON(OptimizedPlan plan, PrintWriter writer) {
-		Collection<SinkPlanNode> sinks = plan.getDataSinks();
-		if (sinks instanceof List) {
-			dumpOptimizerPlanAsJSON((List<SinkPlanNode>) sinks, writer);
-		} else {
-			List<SinkPlanNode> n = new ArrayList<SinkPlanNode>();
-			n.addAll(sinks);
-			dumpOptimizerPlanAsJSON(n, writer);
-		}
-	}
-	
-	public void dumpOptimizerPlanAsJSON(List<SinkPlanNode> nodes, PrintWriter writer) {
-		@SuppressWarnings("unchecked")
-		List<DumpableNode<?>> n = (List<DumpableNode<?>>) (List<?>) nodes;
-		compilePlanToJSON(n, writer);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	private void compilePlanToJSON(List<DumpableNode<?>> nodes, PrintWriter writer) {
-		// initialization to assign node ids
-		this.nodeIds = new HashMap<DumpableNode<?>, Integer>();
-		this.nodeCnt = 0;
-		
-		// JSON header
-		writer.print("{\n\t\"nodes\": [\n\n");
-
-		// Generate JSON for plan
-		for (int i = 0; i < nodes.size(); i++) {
-			visit(nodes.get(i), writer, i == 0);
-		}
-		
-		// JSON Footer
-		writer.println("\n\t]\n}");
-	}
-
-	private boolean visit(DumpableNode<?> node, PrintWriter writer, boolean first) {
-		// check for duplicate traversal
-		if (this.nodeIds.containsKey(node)) {
-			return false;
-		}
-		
-		// assign an id first
-		this.nodeIds.put(node, this.nodeCnt++);
-		
-		// then recurse
-		for (DumpableNode<?> child : node.getPredecessors()) {
-			//This is important, because when the node was already in the graph it is not allowed
-			//to set first to false!
-			if (visit(child, writer, first)) {
-				first = false;
-			};
-		}
-		
-		// check if this node should be skipped from the dump
-		final OptimizerNode n = node.getOptimizerNode();
-		
-		// ------------------ dump after the ascend ---------------------
-		// start a new node and output node id
-		if (!first) {
-			writer.print(",\n");	
-		}
-		// open the node
-		writer.print("\t{\n");
-		
-		// recurse, it is is an iteration node
-		if (node instanceof BulkIterationNode || node instanceof BulkIterationPlanNode) {
-			
-			DumpableNode<?> innerChild = node instanceof BulkIterationNode ?
-					((BulkIterationNode) node).getNextPartialSolution() :
-					((BulkIterationPlanNode) node).getRootOfStepFunction();
-					
-			DumpableNode<?> begin = node instanceof BulkIterationNode ?
-				((BulkIterationNode) node).getPartialSolution() :
-				((BulkIterationPlanNode) node).getPartialSolutionPlanNode();
-			
-			writer.print("\t\t\"step_function\": [\n");
-			
-			visit(innerChild, writer, true);
-			
-			writer.print("\n\t\t],\n");
-			writer.print("\t\t\"partial_solution\": " + this.nodeIds.get(begin) + ",\n");
-			writer.print("\t\t\"next_partial_solution\": " + this.nodeIds.get(innerChild) + ",\n");
-		} else if (node instanceof WorksetIterationNode || node instanceof WorksetIterationPlanNode) {
-			
-			DumpableNode<?> worksetRoot = node instanceof WorksetIterationNode ?
-					((WorksetIterationNode) node).getNextWorkset() :
-					((WorksetIterationPlanNode) node).getNextWorkSetPlanNode();
-			DumpableNode<?> solutionDelta = node instanceof WorksetIterationNode ?
-					((WorksetIterationNode) node).getSolutionSetDelta() :
-					((WorksetIterationPlanNode) node).getSolutionSetDeltaPlanNode();
-					
-			DumpableNode<?> workset = node instanceof WorksetIterationNode ?
-						((WorksetIterationNode) node).getWorksetNode() :
-						((WorksetIterationPlanNode) node).getWorksetPlanNode();
-			DumpableNode<?> solutionSet = node instanceof WorksetIterationNode ?
-						((WorksetIterationNode) node).getSolutionSetNode() :
-						((WorksetIterationPlanNode) node).getSolutionSetPlanNode();
-			
-			writer.print("\t\t\"step_function\": [\n");
-			
-			visit(worksetRoot, writer, true);
-			visit(solutionDelta, writer, false);
-			
-			writer.print("\n\t\t],\n");
-			writer.print("\t\t\"workset\": " + this.nodeIds.get(workset) + ",\n");
-			writer.print("\t\t\"solution_set\": " + this.nodeIds.get(solutionSet) + ",\n");
-			writer.print("\t\t\"next_workset\": " + this.nodeIds.get(worksetRoot) + ",\n");
-			writer.print("\t\t\"solution_delta\": " + this.nodeIds.get(solutionDelta) + ",\n");
-		}
-		
-		// print the id
-		writer.print("\t\t\"id\": " + this.nodeIds.get(node));
-
-		
-		final String type;
-		String contents;
-		if (n instanceof DataSinkNode) {
-			type = "sink";
-			contents = n.getOperator().toString();
-		} else if (n instanceof DataSourceNode) {
-			type = "source";
-			contents = n.getOperator().toString();
-		}
-		else if (n instanceof BulkIterationNode) {
-			type = "bulk_iteration";
-			contents = n.getOperator().getName();
-		}
-		else if (n instanceof WorksetIterationNode) {
-			type = "workset_iteration";
-			contents = n.getOperator().getName();
-		}
-		else if (n instanceof BinaryUnionNode) {
-			type = "pact";
-			contents = "";
-		}
-		else {
-			type = "pact";
-			contents = n.getOperator().getName();
-		}
-		
-		contents = StringUtils.showControlCharacters(contents);
-		if (encodeForHTML) {
-			contents = StringEscapeUtils.escapeHtml4(contents);
-			contents = contents.replace("\\", "&#92;");
-		}
-		
-		
-		String name = n.getName();
-		if (name.equals("Reduce") && (node instanceof SingleInputPlanNode) && 
-				((SingleInputPlanNode) node).getDriverStrategy() == DriverStrategy.SORTED_GROUP_COMBINE) {
-			name = "Combine";
-		}
-		
-		// output the type identifier
-		writer.print(",\n\t\t\"type\": \"" + type + "\"");
-		
-		// output node name
-		writer.print(",\n\t\t\"pact\": \"" + name + "\"");
-		
-		// output node contents
-		writer.print(",\n\t\t\"contents\": \"" + contents + "\"");
-
-		// degree of parallelism
-		writer.print(",\n\t\t\"parallelism\": \""
-			+ (n.getParallelism() >= 1 ? n.getParallelism() : "default") + "\"");
-		
-		// output node predecessors
-		Iterator<? extends DumpableConnection<?>> inConns = node.getDumpableInputs().iterator();
-		String child1name = "", child2name = "";
-
-		if (inConns != null && inConns.hasNext()) {
-			// start predecessor list
-			writer.print(",\n\t\t\"predecessors\": [");
-			int inputNum = 0;
-			
-			while (inConns.hasNext()) {
-				final DumpableConnection<?> inConn = inConns.next();
-				final DumpableNode<?> source = inConn.getSource();
-				writer.print(inputNum == 0 ? "\n" : ",\n");
-				if (inputNum == 0) {
-					child1name += child1name.length() > 0 ? ", " : ""; 
-					child1name += source.getOptimizerNode().getOperator().getName();
-				} else if (inputNum == 1) {
-					child2name += child2name.length() > 0 ? ", " : ""; 
-					child2name = source.getOptimizerNode().getOperator().getName();
-				}
-
-				// output predecessor id
-				writer.print("\t\t\t{\"id\": " + this.nodeIds.get(source));
-
-				// output connection side
-				if (inConns.hasNext() || inputNum > 0) {
-					writer.print(", \"side\": \"" + (inputNum == 0 ? "first" : "second") + "\"");
-				}
-				// output shipping strategy and channel type
-				final Channel channel = (inConn instanceof Channel) ? (Channel) inConn : null; 
-				final ShipStrategyType shipType = channel != null ? channel.getShipStrategy() :
-						((DagConnection) inConn).getShipStrategy();
-					
-				String shipStrategy = null;
-				if (shipType != null) {
-					switch (shipType) {
-					case NONE:
-						// nothing
-						break;
-					case FORWARD:
-						shipStrategy = "Forward";
-						break;
-					case BROADCAST:
-						shipStrategy = "Broadcast";
-						break;
-					case PARTITION_HASH:
-						shipStrategy = "Hash Partition";
-						break;
-					case PARTITION_RANGE:
-						shipStrategy = "Range Partition";
-						break;
-					case PARTITION_RANDOM:
-						shipStrategy = "Redistribute";
-						break;
-					case PARTITION_FORCED_REBALANCE:
-						shipStrategy = "Rebalance";
-						break;
-					case PARTITION_CUSTOM:
-						shipStrategy = "Custom Partition";
-						break;
-					default:
-						throw new CompilerException("Unknown ship strategy '" + inConn.getShipStrategy().name()
-							+ "' in JSON generator.");
-					}
-				}
-				
-				if (channel != null && channel.getShipStrategyKeys() != null && channel.getShipStrategyKeys().size() > 0) {
-					shipStrategy += " on " + (channel.getShipStrategySortOrder() == null ?
-							channel.getShipStrategyKeys().toString() :
-							Utils.createOrdering(channel.getShipStrategyKeys(), channel.getShipStrategySortOrder()).toString());
-				}
-
-				if (shipStrategy != null) {
-					writer.print(", \"ship_strategy\": \"" + shipStrategy + "\"");
-				}
-				
-				if (channel != null) {
-					String localStrategy = null;
-					switch (channel.getLocalStrategy()) {
-					case NONE:
-						break;
-					case SORT:
-						localStrategy = "Sort";
-						break;
-					case COMBININGSORT:
-						localStrategy = "Sort (combining)";
-						break;
-					default:
-						throw new CompilerException("Unknown local strategy " + channel.getLocalStrategy().name());
-					}
-					
-					if (channel != null && channel.getLocalStrategyKeys() != null && channel.getLocalStrategyKeys().size() > 0) {
-						localStrategy += " on " + (channel.getLocalStrategySortOrder() == null ?
-								channel.getLocalStrategyKeys().toString() :
-								Utils.createOrdering(channel.getLocalStrategyKeys(), channel.getLocalStrategySortOrder()).toString());
-					}
-					
-					if (localStrategy != null) {
-						writer.print(", \"local_strategy\": \"" + localStrategy + "\"");
-					}
-					
-					if (channel != null && channel.getTempMode() != TempMode.NONE) {
-						String tempMode = channel.getTempMode().toString();
-						writer.print(", \"temp_mode\": \"" + tempMode + "\"");
-					}
-				}
-				
-				writer.print('}');
-				inputNum++;
-			}
-			// finish predecessors
-			writer.print("\n\t\t]");
-		}
-		
-		//---------------------------------------------------------------------------------------
-		// the part below here is relevant only to plan nodes with concrete strategies, etc
-		//---------------------------------------------------------------------------------------
-
-		final PlanNode p = node.getPlanNode();
-		if (p == null) {
-			// finish node
-			writer.print("\n\t}");
-			return true;
-		}
-		// local strategy
-		String locString = null;
-		if (p.getDriverStrategy() != null) {
-			switch (p.getDriverStrategy()) {
-			case NONE:
-			case BINARY_NO_OP:
-				break;
-				
-			case UNARY_NO_OP:
-				locString = "No-Op";
-				break;
-				
-			case COLLECTOR_MAP:
-			case MAP:
-				locString = "Map";
-				break;
-				
-			case FLAT_MAP:
-				locString = "FlatMap";
-				break;
-				
-			case MAP_PARTITION:
-				locString = "Map Partition";
-				break;
-			
-			case ALL_REDUCE:
-				locString = "Reduce All";
-				break;
-			
-			case ALL_GROUP_REDUCE:
-			case ALL_GROUP_REDUCE_COMBINE:
-				locString = "Group Reduce All";
-				break;
-				
-			case SORTED_REDUCE:
-				locString = "Sorted Reduce";
-				break;
-				
-			case SORTED_PARTIAL_REDUCE:
-				locString = "Sorted Combine/Reduce";
-				break;
-
-			case SORTED_GROUP_REDUCE:
-				locString = "Sorted Group Reduce";
-				break;
-				
-			case SORTED_GROUP_COMBINE:
-				locString = "Sorted Combine";
-				break;
-
-			case HYBRIDHASH_BUILD_FIRST:
-				locString = "Hybrid Hash (build: " + child1name + ")";
-				break;
-			case HYBRIDHASH_BUILD_SECOND:
-				locString = "Hybrid Hash (build: " + child2name + ")";
-				break;
-				
-			case HYBRIDHASH_BUILD_FIRST_CACHED:
-				locString = "Hybrid Hash (CACHED) (build: " + child1name + ")";
-				break;
-			case HYBRIDHASH_BUILD_SECOND_CACHED:
-				locString = "Hybrid Hash (CACHED) (build: " + child2name + ")";
-				break;
-
-			case NESTEDLOOP_BLOCKED_OUTER_FIRST:
-				locString = "Nested Loops (Blocked Outer: " + child1name + ")";
-				break;
-			case NESTEDLOOP_BLOCKED_OUTER_SECOND:
-				locString = "Nested Loops (Blocked Outer: " + child2name + ")";
-				break;
-			case NESTEDLOOP_STREAMED_OUTER_FIRST:
-				locString = "Nested Loops (Streamed Outer: " + child1name + ")";
-				break;
-			case NESTEDLOOP_STREAMED_OUTER_SECOND:
-				locString = "Nested Loops (Streamed Outer: " + child2name + ")";
-				break;
-
-			case MERGE:
-				locString = "Merge";
-				break;
-
-			case CO_GROUP:
-				locString = "Co-Group";
-				break;
-
-			default:
-				locString = p.getDriverStrategy().name();
-				break;
-			}
-
-			if (locString != null) {
-				writer.print(",\n\t\t\"driver_strategy\": \"");
-				writer.print(locString);
-				writer.print("\"");
-			}
-		}
-		
-		{
-			// output node global properties
-			final GlobalProperties gp = p.getGlobalProperties();
-
-			writer.print(",\n\t\t\"global_properties\": [\n");
-
-			addProperty(writer, "Partitioning", gp.getPartitioning().name(), true);
-			if (gp.getPartitioningFields() != null) {
-				addProperty(writer, "Partitioned on", gp.getPartitioningFields().toString(), false);
-			}
-			if (gp.getPartitioningOrdering() != null) {
-				addProperty(writer, "Partitioning Order", gp.getPartitioningOrdering().toString(), false);	
-			}
-			else {
-				addProperty(writer, "Partitioning Order", "(none)", false);
-			}
-			if (n.getUniqueFields() == null || n.getUniqueFields().size() == 0) {
-				addProperty(writer, "Uniqueness", "not unique", false);
-			}
-			else {
-				addProperty(writer, "Uniqueness", n.getUniqueFields().toString(), false);	
-			}
-
-			writer.print("\n\t\t]");
-		}
-
-		{
-			// output node local properties
-			LocalProperties lp = p.getLocalProperties();
-
-			writer.print(",\n\t\t\"local_properties\": [\n");
-
-			if (lp.getOrdering() != null) {
-				addProperty(writer, "Order", lp.getOrdering().toString(), true);	
-			}
-			else {
-				addProperty(writer, "Order", "(none)", true);
-			}
-			if (lp.getGroupedFields() != null && lp.getGroupedFields().size() > 0) {
-				addProperty(writer, "Grouped on", lp.getGroupedFields().toString(), false);
-			} else {
-				addProperty(writer, "Grouping", "not grouped", false);	
-			}
-			if (n.getUniqueFields() == null || n.getUniqueFields().size() == 0) {
-				addProperty(writer, "Uniqueness", "not unique", false);
-			}
-			else {
-				addProperty(writer, "Uniqueness", n.getUniqueFields().toString(), false);	
-			}
-
-			writer.print("\n\t\t]");
-		}
-
-		// output node size estimates
-		writer.print(",\n\t\t\"estimates\": [\n");
-
-		addProperty(writer, "Est. Output Size", n.getEstimatedOutputSize() == -1 ? "(unknown)"
-			: formatNumber(n.getEstimatedOutputSize(), "B"), true);
-		addProperty(writer, "Est. Cardinality", n.getEstimatedNumRecords() == -1 ? "(unknown)"
-			: formatNumber(n.getEstimatedNumRecords()), false);
-
-		writer.print("\t\t]");
-
-		// output node cost
-		if (p.getNodeCosts() != null) {
-			writer.print(",\n\t\t\"costs\": [\n");
-
-			addProperty(writer, "Network", p.getNodeCosts().getNetworkCost() == -1 ? "(unknown)"
-				: formatNumber(p.getNodeCosts().getNetworkCost(), "B"), true);
-			addProperty(writer, "Disk I/O", p.getNodeCosts().getDiskCost() == -1 ? "(unknown)"
-				: formatNumber(p.getNodeCosts().getDiskCost(), "B"), false);
-			addProperty(writer, "CPU", p.getNodeCosts().getCpuCost() == -1 ? "(unknown)"
-				: formatNumber(p.getNodeCosts().getCpuCost(), ""), false);
-
-			addProperty(writer, "Cumulative Network",
-				p.getCumulativeCosts().getNetworkCost() == -1 ? "(unknown)" : formatNumber(p
-					.getCumulativeCosts().getNetworkCost(), "B"), false);
-			addProperty(writer, "Cumulative Disk I/O",
-				p.getCumulativeCosts().getDiskCost() == -1 ? "(unknown)" : formatNumber(p
-					.getCumulativeCosts().getDiskCost(), "B"), false);
-			addProperty(writer, "Cumulative CPU",
-				p.getCumulativeCosts().getCpuCost() == -1 ? "(unknown)" : formatNumber(p
-					.getCumulativeCosts().getCpuCost(), ""), false);
-
-			writer.print("\n\t\t]");
-		}
-
-		// output the node compiler hints
-		if (n.getOperator().getCompilerHints() != null) {
-			CompilerHints hints = n.getOperator().getCompilerHints();
-			CompilerHints defaults = new CompilerHints();
-
-			String size = hints.getOutputSize() == defaults.getOutputSize() ? "(none)" : String.valueOf(hints.getOutputSize());
-			String card = hints.getOutputCardinality() == defaults.getOutputCardinality() ? "(none)" : String.valueOf(hints.getOutputCardinality());
-			String width = hints.getAvgOutputRecordSize() == defaults.getAvgOutputRecordSize() ? "(none)" : String.valueOf(hints.getAvgOutputRecordSize());
-			String filter = hints.getFilterFactor() == defaults.getFilterFactor() ? "(none)" : String.valueOf(hints.getFilterFactor());
-			
-			writer.print(",\n\t\t\"compiler_hints\": [\n");
-
-			addProperty(writer, "Output Size (bytes)", size, true);
-			addProperty(writer, "Output Cardinality", card, false);
-			addProperty(writer, "Avg. Output Record Size (bytes)", width, false);
-			addProperty(writer, "Filter Factor", filter, false);
-
-			writer.print("\t\t]");
-		}
-
-		// finish node
-		writer.print("\n\t}");
-		return true;
-	}
-
-	private void addProperty(PrintWriter writer, String name, String value, boolean first) {
-		if (!first) {
-			writer.print(",\n");
-		}
-		writer.print("\t\t\t{ \"name\": \"");
-		writer.print(name);
-		writer.print("\", \"value\": \"");
-		writer.print(value);
-		writer.print("\" }");
-	}
-
-	public static final String formatNumber(double number) {
-		return formatNumber(number, "");
-	}
-
-	public static final String formatNumber(double number, String suffix) {
-		if (number <= 0.0) {
-			return String.valueOf(number);
-		}
-
-		int power = (int) Math.ceil(Math.log10(number));
-
-		int group = (power - 1) / 3;
-		if (group >= SIZE_SUFFIXES.length) {
-			group = SIZE_SUFFIXES.length - 1;
-		} else if (group < 0) {
-			group = 0;
-		}
-
-		// truncate fractional part
-		int beforeDecimal = power - group * 3;
-		if (power > beforeDecimal) {
-			for (int i = power - beforeDecimal; i > 0; i--) {
-				number /= 10;
-			}
-		}
-		
-		return group > 0 ? String.format(Locale.US, "%.2f %s", number, SIZE_SUFFIXES[group]) :
-			String.format(Locale.US, "%.2f", number);
-	}
-
-	private static final char[] SIZE_SUFFIXES = { 0, 'K', 'M', 'G', 'T' };
-}


Mime
View raw message