flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [30/53] [abbrv] flink git commit: [optimizer] Rename optimizer project to "flink-optimizer" (previously flink-compiler)
Date Fri, 20 Mar 2015 10:07:09 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/GenericFlatTypePostPass.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/GenericFlatTypePostPass.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/GenericFlatTypePostPass.java
deleted file mode 100644
index 2d8377e..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/GenericFlatTypePostPass.java
+++ /dev/null
@@ -1,579 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.postpass;
-
-import java.util.Map;
-
-import org.apache.flink.api.common.operators.SemanticProperties;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.CompilerPostPassException;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.optimizer.dag.SingleInputNode;
-import org.apache.flink.optimizer.dag.TwoInputNode;
-import org.apache.flink.optimizer.dag.WorksetIterationNode;
-import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
-import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
-import org.apache.flink.optimizer.plan.SourcePlanNode;
-import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.optimizer.plan.WorksetPlanNode;
-
-/**
- * 
- */
-public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> implements OptimizerPostPass {
-	
-	private boolean propagateParentSchemaDown = true;
-	
-	
-	public boolean isPropagateParentSchemaDown() {
-		return propagateParentSchemaDown;
-	}
-	
-	public void setPropagateParentSchemaDown(boolean propagateParentSchemaDown) {
-		this.propagateParentSchemaDown = propagateParentSchemaDown;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	// Generic schema inferring traversal
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void postPass(OptimizedPlan plan) {
-		for (SinkPlanNode sink : plan.getDataSinks()) {
-			traverse(sink, null, true);
-		}
-	}
-	
-	@SuppressWarnings("unchecked")
-	protected void traverse(PlanNode node, T parentSchema, boolean createUtilities) {
-		// distinguish the node types
-		if (node instanceof SinkPlanNode) {
-			SinkPlanNode sn = (SinkPlanNode) node;
-			Channel inchannel = sn.getInput();
-			
-			T schema = createEmptySchema();
-			sn.postPassHelper = schema;
-			
-			// add the sinks information to the schema
-			try {
-				getSinkSchema(sn, schema);
-			}
-			catch (ConflictingFieldTypeInfoException e) {
-				throw new CompilerPostPassException("Conflicting type infomation for the data sink '" +
-						sn.getSinkNode().getOperator().getName() + "'.");
-			}
-			
-			// descend to the input channel
-			try {
-				propagateToChannel(schema, inchannel, createUtilities);
-			}
-			catch (MissingFieldTypeInfoException ex) {
-				throw new CompilerPostPassException("Missing type infomation for the channel that inputs to the data sink '" +
-						sn.getSinkNode().getOperator().getName() + "'.");
-			}
-		}
-		else if (node instanceof SourcePlanNode) {
-			if (createUtilities) {
-				((SourcePlanNode) node).setSerializer(createSerializer(parentSchema, node));
-				// nothing else to be done here. the source has no input and no strategy itself
-			}
-		}
-		else if (node instanceof BulkIterationPlanNode) {
-			BulkIterationPlanNode iterationNode = (BulkIterationPlanNode) node;
-			
-			// get the nodes current schema
-			T schema;
-			if (iterationNode.postPassHelper == null) {
-				schema = createEmptySchema();
-				iterationNode.postPassHelper = schema;
-			} else {
-				schema = (T) iterationNode.postPassHelper;
-			}
-			schema.increaseNumConnectionsThatContributed();
-			
-			// add the parent schema to the schema
-			if (propagateParentSchemaDown) {
-				addSchemaToSchema(parentSchema, schema, iterationNode.getProgramOperator().getName());
-			}
-			
-			// check whether all outgoing channels have not yet contributed. come back later if not.
-			if (schema.getNumConnectionsThatContributed() < iterationNode.getOutgoingChannels().size()) {
-				return;
-			}
-			
-			if (iterationNode.getRootOfStepFunction() instanceof NAryUnionPlanNode) {
-				throw new CompilerException("Optimizer cannot compile an iteration step function where next partial solution is created by a Union node.");
-			}
-			
-			// traverse the termination criterion for the first time. create schema only, no utilities. Needed in case of intermediate termination criterion
-			if (iterationNode.getRootOfTerminationCriterion() != null) {
-				SingleInputPlanNode addMapper = (SingleInputPlanNode) iterationNode.getRootOfTerminationCriterion();
-				traverse(addMapper.getInput().getSource(), createEmptySchema(), false);
-				try {
-					addMapper.getInput().setSerializer(createSerializer(createEmptySchema()));
-				} catch (MissingFieldTypeInfoException e) {
-					throw new RuntimeException(e);
-				}
-			}
-			
-			// traverse the step function for the first time. create schema only, no utilities
-			traverse(iterationNode.getRootOfStepFunction(), schema, false);
-			
-			T pss = (T) iterationNode.getPartialSolutionPlanNode().postPassHelper;
-			if (pss == null) {
-				throw new CompilerException("Error in Optimizer Post Pass: Partial solution schema is null after first traversal of the step function.");
-			}
-			
-			// traverse the step function for the second time, taking the schema of the partial solution
-			traverse(iterationNode.getRootOfStepFunction(), pss, createUtilities);
-			
-			if (iterationNode.getRootOfTerminationCriterion() != null) {
-				SingleInputPlanNode addMapper = (SingleInputPlanNode) iterationNode.getRootOfTerminationCriterion();
-				traverse(addMapper.getInput().getSource(), createEmptySchema(), createUtilities);
-				try {
-					addMapper.getInput().setSerializer(createSerializer(createEmptySchema()));
-				} catch (MissingFieldTypeInfoException e) {
-					throw new RuntimeException(e);
-				}
-			}
-			
-			// take the schema from the partial solution node and add its fields to the iteration result schema.
-			// input and output schema need to be identical, so this is essentially a sanity check
-			addSchemaToSchema(pss, schema, iterationNode.getProgramOperator().getName());
-			
-			// set the serializer
-			if (createUtilities) {
-				iterationNode.setSerializerForIterationChannel(createSerializer(pss, iterationNode.getPartialSolutionPlanNode()));
-			}
-			
-			// done, we can now propagate our info down
-			try {
-				propagateToChannel(schema, iterationNode.getInput(), createUtilities);
-			} catch (MissingFieldTypeInfoException e) {
-				throw new CompilerPostPassException("Could not set up runtime strategy for input channel to node '"
-					+ iterationNode.getProgramOperator().getName() + "'. Missing type information for key field " +
-					e.getFieldNumber());
-			}
-		}
-		else if (node instanceof WorksetIterationPlanNode) {
-			WorksetIterationPlanNode iterationNode = (WorksetIterationPlanNode) node;
-			
-			// get the nodes current schema
-			T schema;
-			if (iterationNode.postPassHelper == null) {
-				schema = createEmptySchema();
-				iterationNode.postPassHelper = schema;
-			} else {
-				schema = (T) iterationNode.postPassHelper;
-			}
-			schema.increaseNumConnectionsThatContributed();
-			
-			// add the parent schema to the schema (which refers to the solution set schema)
-			if (propagateParentSchemaDown) {
-				addSchemaToSchema(parentSchema, schema, iterationNode.getProgramOperator().getName());
-			}
-			
-			// check whether all outgoing channels have not yet contributed. come back later if not.
-			if (schema.getNumConnectionsThatContributed() < iterationNode.getOutgoingChannels().size()) {
-				return;
-			}
-			if (iterationNode.getNextWorkSetPlanNode() instanceof NAryUnionPlanNode) {
-				throw new CompilerException("Optimizer cannot compile a workset iteration step function where the next workset is produced by a Union node.");
-			}
-			if (iterationNode.getSolutionSetDeltaPlanNode() instanceof NAryUnionPlanNode) {
-				throw new CompilerException("Optimizer cannot compile a workset iteration step function where the solution set delta is produced by a Union node.");
-			}
-			
-			// traverse the step function
-			// pass an empty schema to the next workset and the parent schema to the solution set delta
-			// these first traversals are schema only
-			traverse(iterationNode.getNextWorkSetPlanNode(), createEmptySchema(), false);
-			traverse(iterationNode.getSolutionSetDeltaPlanNode(), schema, false);
-			
-			T wss = (T) iterationNode.getWorksetPlanNode().postPassHelper;
-			T sss = (T) iterationNode.getSolutionSetPlanNode().postPassHelper;
-			
-			if (wss == null) {
-				throw new CompilerException("Error in Optimizer Post Pass: Workset schema is null after first traversal of the step function.");
-			}
-			if (sss == null) {
-				throw new CompilerException("Error in Optimizer Post Pass: Solution set schema is null after first traversal of the step function.");
-			}
-			
-			// make the second pass and instantiate the utilities
-			traverse(iterationNode.getNextWorkSetPlanNode(), wss, createUtilities);
-			traverse(iterationNode.getSolutionSetDeltaPlanNode(), sss, createUtilities);
-			
-			// add the types from the solution set schema to the iteration's own schema. since
-			// the solution set input and the result must have the same schema, this acts as a sanity check.
-			try {
-				for (Map.Entry<Integer, X> entry : sss) {
-					Integer pos = entry.getKey();
-					schema.addType(pos, entry.getValue());
-				}
-			} catch (ConflictingFieldTypeInfoException e) {
-				throw new CompilerPostPassException("Conflicting type information for field " + e.getFieldNumber()
-					+ " in node '" + iterationNode.getProgramOperator().getName() + "'. Contradicting types between the " +
-					"result of the iteration and the solution set schema: " + e.getPreviousType() + 
-					" and " + e.getNewType() + ". Most probable cause: Invalid constant field annotations.");
-			}
-			
-			// set the serializers and comparators
-			if (createUtilities) {
-				WorksetIterationNode optNode = iterationNode.getIterationNode();
-				iterationNode.setWorksetSerializer(createSerializer(wss, iterationNode.getWorksetPlanNode()));
-				iterationNode.setSolutionSetSerializer(createSerializer(sss, iterationNode.getSolutionSetPlanNode()));
-				try {
-					iterationNode.setSolutionSetComparator(createComparator(optNode.getSolutionSetKeyFields(), null, sss));
-				} catch (MissingFieldTypeInfoException ex) {
-					throw new CompilerPostPassException("Could not set up the solution set for workset iteration '" + 
-							optNode.getOperator().getName() + "'. Missing type information for key field " + ex.getFieldNumber() + '.');
-				}
-			}
-			
-			// done, we can now propagate our info down
-			try {
-				propagateToChannel(schema, iterationNode.getInitialSolutionSetInput(), createUtilities);
-				propagateToChannel(wss, iterationNode.getInitialWorksetInput(), createUtilities);
-			} catch (MissingFieldTypeInfoException ex) {
-				throw new CompilerPostPassException("Could not set up runtime strategy for input channel to node '"
-					+ iterationNode.getProgramOperator().getName() + "'. Missing type information for key field " +
-					ex.getFieldNumber());
-			}
-		}
-		else if (node instanceof SingleInputPlanNode) {
-			SingleInputPlanNode sn = (SingleInputPlanNode) node;
-			
-			// get the nodes current schema
-			T schema;
-			if (sn.postPassHelper == null) {
-				schema = createEmptySchema();
-				sn.postPassHelper = schema;
-			} else {
-				schema = (T) sn.postPassHelper;
-			}
-			schema.increaseNumConnectionsThatContributed();
-			SingleInputNode optNode = sn.getSingleInputNode();
-			
-			// add the parent schema to the schema
-			if (propagateParentSchemaDown) {
-				addSchemaToSchema(parentSchema, schema, optNode, 0);
-			}
-			
-			// check whether all outgoing channels have not yet contributed. come back later if not.
-			if (schema.getNumConnectionsThatContributed() < sn.getOutgoingChannels().size()) {
-				return;
-			}
-			
-			// add the nodes local information
-			try {
-				getSingleInputNodeSchema(sn, schema);
-			} catch (ConflictingFieldTypeInfoException e) {
-				throw new CompilerPostPassException(getConflictingTypeErrorMessage(e, optNode.getOperator().getName()));
-			}
-			
-			if (createUtilities) {
-				// parameterize the node's driver strategy
-				for(int i=0;i<sn.getDriverStrategy().getNumRequiredComparators();i++) {
-					try {
-						sn.setComparator(createComparator(sn.getKeys(i), sn.getSortOrders(i), schema),i);
-					} catch (MissingFieldTypeInfoException e) {
-						throw new CompilerPostPassException("Could not set up runtime strategy for node '" + 
-								optNode.getOperator().getName() + "'. Missing type information for key field " +
-								e.getFieldNumber());
-					}
-				}
-			}
-			
-			// done, we can now propagate our info down
-			try {
-				propagateToChannel(schema, sn.getInput(), createUtilities);
-			} catch (MissingFieldTypeInfoException e) {
-				throw new CompilerPostPassException("Could not set up runtime strategy for input channel to node '" +
-					optNode.getOperator().getName() + "'. Missing type information for field " + e.getFieldNumber());
-			}
-			
-			// don't forget the broadcast inputs
-			for (Channel c: sn.getBroadcastInputs()) {
-				try {
-					propagateToChannel(createEmptySchema(), c, createUtilities);
-				} catch (MissingFieldTypeInfoException e) {
-					throw new CompilerPostPassException("Could not set up runtime strategy for broadcast channel in node '" +
-						optNode.getOperator().getName() + "'. Missing type information for field " + e.getFieldNumber());
-				}
-			}
-		}
-		else if (node instanceof DualInputPlanNode) {
-			DualInputPlanNode dn = (DualInputPlanNode) node;
-			
-			// get the nodes current schema
-			T schema1;
-			T schema2;
-			if (dn.postPassHelper1 == null) {
-				schema1 = createEmptySchema();
-				schema2 = createEmptySchema();
-				dn.postPassHelper1 = schema1;
-				dn.postPassHelper2 = schema2;
-			} else {
-				schema1 = (T) dn.postPassHelper1;
-				schema2 = (T) dn.postPassHelper2;
-			}
-
-			schema1.increaseNumConnectionsThatContributed();
-			schema2.increaseNumConnectionsThatContributed();
-			TwoInputNode optNode = dn.getTwoInputNode();
-			
-			// add the parent schema to the schema
-			if (propagateParentSchemaDown) {
-				addSchemaToSchema(parentSchema, schema1, optNode, 0);
-				addSchemaToSchema(parentSchema, schema2, optNode, 1);
-			}
-			
-			// check whether all outgoing channels have not yet contributed. come back later if not.
-			if (schema1.getNumConnectionsThatContributed() < dn.getOutgoingChannels().size()) {
-				return;
-			}
-			
-			// add the nodes local information
-			try {
-				getDualInputNodeSchema(dn, schema1, schema2);
-			} catch (ConflictingFieldTypeInfoException e) {
-				throw new CompilerPostPassException(getConflictingTypeErrorMessage(e, optNode.getOperator().getName()));
-			}
-			
-			// parameterize the node's driver strategy
-			if (createUtilities) {
-				if (dn.getDriverStrategy().getNumRequiredComparators() > 0) {
-					// set the individual comparators
-					try {
-						dn.setComparator1(createComparator(dn.getKeysForInput1(), dn.getSortOrders(), schema1));
-						dn.setComparator2(createComparator(dn.getKeysForInput2(), dn.getSortOrders(), schema2));
-					} catch (MissingFieldTypeInfoException e) {
-						throw new CompilerPostPassException("Could not set up runtime strategy for node '" + 
-								optNode.getOperator().getName() + "'. Missing type information for field " + e.getFieldNumber());
-					}
-					
-					// set the pair comparator
-					try {
-						dn.setPairComparator(createPairComparator(dn.getKeysForInput1(), dn.getKeysForInput2(), 
-							dn.getSortOrders(), schema1, schema2));
-					} catch (MissingFieldTypeInfoException e) {
-						throw new CompilerPostPassException("Could not set up runtime strategy for node '" + 
-								optNode.getOperator().getName() + "'. Missing type information for field " + e.getFieldNumber());
-					}
-					
-				}
-			}
-			
-			// done, we can now propagate our info down
-			try {
-				propagateToChannel(schema1, dn.getInput1(), createUtilities);
-			} catch (MissingFieldTypeInfoException e) {
-				throw new CompilerPostPassException("Could not set up runtime strategy for the first input channel to node '"
-					+ optNode.getOperator().getName() + "'. Missing type information for field " + e.getFieldNumber());
-			}
-			try {
-				propagateToChannel(schema2, dn.getInput2(), createUtilities);
-			} catch (MissingFieldTypeInfoException e) {
-				throw new CompilerPostPassException("Could not set up runtime strategy for the second input channel to node '"
-					+ optNode.getOperator().getName() + "'. Missing type information for field " + e.getFieldNumber());
-			}
-			
-			// don't forget the broadcast inputs
-			for (Channel c: dn.getBroadcastInputs()) {
-				try {
-					propagateToChannel(createEmptySchema(), c, createUtilities);
-				} catch (MissingFieldTypeInfoException e) {
-					throw new CompilerPostPassException("Could not set up runtime strategy for broadcast channel in node '" +
-						optNode.getOperator().getName() + "'. Missing type information for field " + e.getFieldNumber());
-				}
-			}
-		}
-		else if (node instanceof NAryUnionPlanNode) {
-			// only propagate the info down
-			try {
-				for (Channel channel : node.getInputs()) {
-					propagateToChannel(parentSchema, channel, createUtilities);
-				}
-			} catch (MissingFieldTypeInfoException ex) {
-				throw new CompilerPostPassException("Could not set up runtime strategy for the input channel to " +
-						" a union node. Missing type information for field " + ex.getFieldNumber());
-			}
-		}
-		// catch the sources of the iterative step functions
-		else if (node instanceof BulkPartialSolutionPlanNode || 
-				node instanceof SolutionSetPlanNode ||
-				node instanceof WorksetPlanNode)
-		{
-			// get the nodes current schema
-			T schema;
-			String name;
-			if (node instanceof BulkPartialSolutionPlanNode) {
-				BulkPartialSolutionPlanNode psn = (BulkPartialSolutionPlanNode) node;
-				if (psn.postPassHelper == null) {
-					schema = createEmptySchema();
-					psn.postPassHelper = schema;
-				} else {
-					schema = (T) psn.postPassHelper;
-				}
-				name = "partial solution of bulk iteration '" +
-					psn.getPartialSolutionNode().getIterationNode().getOperator().getName() + "'";
-			}
-			else if (node instanceof SolutionSetPlanNode) {
-				SolutionSetPlanNode ssn = (SolutionSetPlanNode) node;
-				if (ssn.postPassHelper == null) {
-					schema = createEmptySchema();
-					ssn.postPassHelper = schema;
-				} else {
-					schema = (T) ssn.postPassHelper;
-				}
-				name = "solution set of workset iteration '" +
-						ssn.getSolutionSetNode().getIterationNode().getOperator().getName() + "'";
-			}
-			else if (node instanceof WorksetPlanNode) {
-				WorksetPlanNode wsn = (WorksetPlanNode) node;
-				if (wsn.postPassHelper == null) {
-					schema = createEmptySchema();
-					wsn.postPassHelper = schema;
-				} else {
-					schema = (T) wsn.postPassHelper;
-				}
-				name = "workset of workset iteration '" +
-						wsn.getWorksetNode().getIterationNode().getOperator().getName() + "'";
-			} else {
-				throw new CompilerException();
-			}
-			
-			schema.increaseNumConnectionsThatContributed();
-			
-			// add the parent schema to the schema
-			addSchemaToSchema(parentSchema, schema, name);
-		}
-		else {
-			throw new CompilerPostPassException("Unknown node type encountered: " + node.getClass().getName());
-		}
-	}
-	
-	private void propagateToChannel(T schema, Channel channel, boolean createUtilities) throws MissingFieldTypeInfoException {
-		if (createUtilities) {
-			// the serializer always exists
-			channel.setSerializer(createSerializer(schema));
-			
-			// parameterize the ship strategy
-			if (channel.getShipStrategy().requiresComparator()) {
-				channel.setShipStrategyComparator(
-					createComparator(channel.getShipStrategyKeys(), channel.getShipStrategySortOrder(), schema));
-			}
-			
-			// parameterize the local strategy
-			if (channel.getLocalStrategy().requiresComparator()) {
-				channel.setLocalStrategyComparator(
-					createComparator(channel.getLocalStrategyKeys(), channel.getLocalStrategySortOrder(), schema));
-			}
-		}
-		
-		// propagate the channel's source model
-		traverse(channel.getSource(), schema, createUtilities);
-	}
-	
-	private void addSchemaToSchema(T sourceSchema, T targetSchema, String opName) {
-		try {
-			for (Map.Entry<Integer, X> entry : sourceSchema) {
-				Integer pos = entry.getKey();
-				targetSchema.addType(pos, entry.getValue());
-			}
-		} catch (ConflictingFieldTypeInfoException e) {
-			throw new CompilerPostPassException("Conflicting type information for field " + e.getFieldNumber()
-				+ " in node '" + opName + "' propagated from successor node. " +
-				"Conflicting types: " + e.getPreviousType() + " and " + e.getNewType() +
-				". Most probable cause: Invalid constant field annotations.");
-		}
-	}
-	
-	private void addSchemaToSchema(T sourceSchema, T targetSchema, OptimizerNode optNode, int input) {
-		try {
-			for (Map.Entry<Integer, X> entry : sourceSchema) {
-				Integer pos = entry.getKey();
-				SemanticProperties sprops = optNode.getSemanticProperties();
-
-				if (sprops != null && sprops.getForwardingTargetFields(input, pos) != null && sprops.getForwardingTargetFields(input, pos).contains(pos)) {
-					targetSchema.addType(pos, entry.getValue());
-				}
-			}
-		} catch (ConflictingFieldTypeInfoException e) {
-			throw new CompilerPostPassException("Conflicting type information for field " + e.getFieldNumber()
-				+ " in node '" + optNode.getOperator().getName() + "' propagated from successor node. " +
-				"Conflicting types: " + e.getPreviousType() + " and " + e.getNewType() +
-				". Most probable cause: Invalid constant field annotations.");
-		}
-	}
-	
-	private String getConflictingTypeErrorMessage(ConflictingFieldTypeInfoException e, String operatorName) {
-		return "Conflicting type information for field " + e.getFieldNumber()
-				+ " in node '" + operatorName + "' between types declared in the node's "
-				+ "contract and types inferred from successor contracts. Conflicting types: "
-				+ e.getPreviousType() + " and " + e.getNewType()
-				+ ". Most probable cause: Invalid constant field annotations.";
-	}
-	
-	private TypeSerializerFactory<?> createSerializer(T schema, PlanNode node) {
-		try {
-			return createSerializer(schema);
-		} catch (MissingFieldTypeInfoException e) {
-			throw new CompilerPostPassException("Missing type information while creating serializer for '" +
-					node.getProgramOperator().getName() + "'.");
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Type specific methods that extract schema information
-	// --------------------------------------------------------------------------------------------
-	
-	protected abstract T createEmptySchema();
-	
-	protected abstract void getSinkSchema(SinkPlanNode sink, T schema) throws CompilerPostPassException, ConflictingFieldTypeInfoException;
-	
-	protected abstract void getSingleInputNodeSchema(SingleInputPlanNode node, T schema) throws CompilerPostPassException, ConflictingFieldTypeInfoException;
-	
-	protected abstract void getDualInputNodeSchema(DualInputPlanNode node, T input1Schema, T input2Schema) throws CompilerPostPassException, ConflictingFieldTypeInfoException;
-
-	// --------------------------------------------------------------------------------------------
-	//  Methods to create serializers and comparators
-	// --------------------------------------------------------------------------------------------
-	
-	protected abstract TypeSerializerFactory<?> createSerializer(T schema) throws MissingFieldTypeInfoException;
-	
-	protected abstract TypeComparatorFactory<?> createComparator(FieldList fields, boolean[] directions, T schema) throws MissingFieldTypeInfoException;
-	
-	protected abstract TypePairComparatorFactory<?, ?> createPairComparator(FieldList fields1, FieldList fields2, boolean[] sortDirections,
-		T schema1, T schema2) throws MissingFieldTypeInfoException;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java
deleted file mode 100644
index 5fdf3dd..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.postpass;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.operators.DualInputOperator;
-import org.apache.flink.api.common.operators.GenericDataSourceBase;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.SingleInputOperator;
-import org.apache.flink.api.common.operators.base.BulkIterationBase;
-import org.apache.flink.api.common.operators.base.DeltaIterationBase;
-import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.typeinfo.AtomicType;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.PojoTypeInfo;
-import org.apache.flink.api.java.typeutils.runtime.RuntimeComparatorFactory;
-import org.apache.flink.api.java.typeutils.runtime.RuntimePairComparatorFactory;
-import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.CompilerPostPassException;
-import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
-import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
-import org.apache.flink.optimizer.plan.SourcePlanNode;
-import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.optimizer.plan.WorksetPlanNode;
-import org.apache.flink.optimizer.util.NoOpUnaryUdfOp;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-/**
- * The post-optimizer plan traversal. This traversal fills in the API specific utilities (serializers and
- * comparators).
- */
-public class JavaApiPostPass implements OptimizerPostPass {
-	
-	private final Set<PlanNode> alreadyDone = new HashSet<PlanNode>();
-
-	private ExecutionConfig executionConfig = null;
-	
-	@Override
-	public void postPass(OptimizedPlan plan) {
-
-		executionConfig = plan.getOriginalPactPlan().getExecutionConfig();
-
-		for (SinkPlanNode sink : plan.getDataSinks()) {
-			traverse(sink);
-		}
-	}
-	
-
-	protected void traverse(PlanNode node) {
-		if (!alreadyDone.add(node)) {
-			// already worked on that one
-			return;
-		}
-		
-		// distinguish the node types
-		if (node instanceof SinkPlanNode) {
-			// descend to the input channel
-			SinkPlanNode sn = (SinkPlanNode) node;
-			Channel inchannel = sn.getInput();
-			traverseChannel(inchannel);
-		}
-		else if (node instanceof SourcePlanNode) {
-			TypeInformation<?> typeInfo = getTypeInfoFromSource((SourcePlanNode) node);
-			((SourcePlanNode) node).setSerializer(createSerializer(typeInfo));
-		}
-		else if (node instanceof BulkIterationPlanNode) {
-			BulkIterationPlanNode iterationNode = (BulkIterationPlanNode) node;
-
-			if (iterationNode.getRootOfStepFunction() instanceof NAryUnionPlanNode) {
-				throw new CompilerException("Optimizer cannot compile an iteration step function where next partial solution is created by a Union node.");
-			}
-			
-			// traverse the termination criterion for the first time. create schema only, no utilities. Needed in case of intermediate termination criterion
-			if (iterationNode.getRootOfTerminationCriterion() != null) {
-				SingleInputPlanNode addMapper = (SingleInputPlanNode) iterationNode.getRootOfTerminationCriterion();
-				traverseChannel(addMapper.getInput());
-			}
-
-			BulkIterationBase<?> operator = (BulkIterationBase<?>) iterationNode.getProgramOperator();
-
-			// set the serializer
-			iterationNode.setSerializerForIterationChannel(createSerializer(operator.getOperatorInfo().getOutputType()));
-
-			// done, we can now propagate our info down
-			traverseChannel(iterationNode.getInput());
-			traverse(iterationNode.getRootOfStepFunction());
-		}
-		else if (node instanceof WorksetIterationPlanNode) {
-			WorksetIterationPlanNode iterationNode = (WorksetIterationPlanNode) node;
-			
-			if (iterationNode.getNextWorkSetPlanNode() instanceof NAryUnionPlanNode) {
-				throw new CompilerException("Optimizer cannot compile a workset iteration step function where the next workset is produced by a Union node.");
-			}
-			if (iterationNode.getSolutionSetDeltaPlanNode() instanceof NAryUnionPlanNode) {
-				throw new CompilerException("Optimizer cannot compile a workset iteration step function where the solution set delta is produced by a Union node.");
-			}
-			
-			DeltaIterationBase<?, ?> operator = (DeltaIterationBase<?, ?>) iterationNode.getProgramOperator();
-			
-			// set the serializers and comparators for the workset iteration
-			iterationNode.setSolutionSetSerializer(createSerializer(operator.getOperatorInfo().getFirstInputType()));
-			iterationNode.setWorksetSerializer(createSerializer(operator.getOperatorInfo().getSecondInputType()));
-			iterationNode.setSolutionSetComparator(createComparator(operator.getOperatorInfo().getFirstInputType(),
-					iterationNode.getSolutionSetKeyFields(), getSortOrders(iterationNode.getSolutionSetKeyFields(), null)));
-			
-			// traverse the inputs
-			traverseChannel(iterationNode.getInput1());
-			traverseChannel(iterationNode.getInput2());
-			
-			// traverse the step function
-			traverse(iterationNode.getSolutionSetDeltaPlanNode());
-			traverse(iterationNode.getNextWorkSetPlanNode());
-		}
-		else if (node instanceof SingleInputPlanNode) {
-			SingleInputPlanNode sn = (SingleInputPlanNode) node;
-			
-			if (!(sn.getOptimizerNode().getOperator() instanceof SingleInputOperator)) {
-				
-				// Special case for delta iterations
-				if(sn.getOptimizerNode().getOperator() instanceof NoOpUnaryUdfOp) {
-					traverseChannel(sn.getInput());
-					return;
-				} else {
-					throw new RuntimeException("Wrong operator type found in post pass.");
-				}
-			}
-			
-			SingleInputOperator<?, ?, ?> singleInputOperator = (SingleInputOperator<?, ?, ?>) sn.getOptimizerNode().getOperator();
-			
-			// parameterize the node's driver strategy
-			for(int i=0;i<sn.getDriverStrategy().getNumRequiredComparators();i++) {
-				sn.setComparator(createComparator(singleInputOperator.getOperatorInfo().getInputType(), sn.getKeys(i),
-						getSortOrders(sn.getKeys(i), sn.getSortOrders(i))), i);
-			}
-			// done, we can now propagate our info down
-			traverseChannel(sn.getInput());
-			
-			// don't forget the broadcast inputs
-			for (Channel c: sn.getBroadcastInputs()) {
-				traverseChannel(c);
-			}
-		}
-		else if (node instanceof DualInputPlanNode) {
-			DualInputPlanNode dn = (DualInputPlanNode) node;
-			
-			if (!(dn.getOptimizerNode().getOperator() instanceof DualInputOperator)) {
-				throw new RuntimeException("Wrong operator type found in post pass.");
-			}
-			
-			DualInputOperator<?, ?, ?, ?> dualInputOperator = (DualInputOperator<?, ?, ?, ?>) dn.getOptimizerNode().getOperator();
-			
-			// parameterize the node's driver strategy
-			if (dn.getDriverStrategy().getNumRequiredComparators() > 0) {
-				dn.setComparator1(createComparator(dualInputOperator.getOperatorInfo().getFirstInputType(), dn.getKeysForInput1(),
-					getSortOrders(dn.getKeysForInput1(), dn.getSortOrders())));
-				dn.setComparator2(createComparator(dualInputOperator.getOperatorInfo().getSecondInputType(), dn.getKeysForInput2(),
-						getSortOrders(dn.getKeysForInput2(), dn.getSortOrders())));
-
-				dn.setPairComparator(createPairComparator(dualInputOperator.getOperatorInfo().getFirstInputType(),
-						dualInputOperator.getOperatorInfo().getSecondInputType()));
-				
-			}
-						
-			traverseChannel(dn.getInput1());
-			traverseChannel(dn.getInput2());
-			
-			// don't forget the broadcast inputs
-			for (Channel c: dn.getBroadcastInputs()) {
-				traverseChannel(c);
-			}
-			
-		}
-		// catch the sources of the iterative step functions
-		else if (node instanceof BulkPartialSolutionPlanNode ||
-				node instanceof SolutionSetPlanNode ||
-				node instanceof WorksetPlanNode)
-		{
-			// Do nothing :D
-		}
-		else if (node instanceof NAryUnionPlanNode){
-			// Traverse to all child channels
-			for (Channel channel : node.getInputs()) {
-				traverseChannel(channel);
-			}
-		}
-		else {
-			throw new CompilerPostPassException("Unknown node type encountered: " + node.getClass().getName());
-		}
-	}
-	
-	private void traverseChannel(Channel channel) {
-		
-		PlanNode source = channel.getSource();
-		Operator<?> javaOp = source.getProgramOperator();
-		
-//		if (!(javaOp instanceof BulkIteration) && !(javaOp instanceof JavaPlanNode)) {
-//			throw new RuntimeException("Wrong operator type found in post pass: " + javaOp);
-//		}
-
-		TypeInformation<?> type = javaOp.getOperatorInfo().getOutputType();
-
-
-		if(javaOp instanceof GroupReduceOperatorBase &&
-				(source.getDriverStrategy() == DriverStrategy.SORTED_GROUP_COMBINE || source.getDriverStrategy() == DriverStrategy.ALL_GROUP_REDUCE_COMBINE)) {
-			GroupReduceOperatorBase<?, ?, ?> groupNode = (GroupReduceOperatorBase<?, ?, ?>) javaOp;
-			type = groupNode.getInput().getOperatorInfo().getOutputType();
-		}
-		else if(javaOp instanceof PlanUnwrappingReduceGroupOperator &&
-				source.getDriverStrategy().equals(DriverStrategy.SORTED_GROUP_COMBINE)) {
-			PlanUnwrappingReduceGroupOperator<?, ?, ?> groupNode = (PlanUnwrappingReduceGroupOperator<?, ?, ?>) javaOp;
-			type = groupNode.getInput().getOperatorInfo().getOutputType();
-		}
-		
-		// the serializer always exists
-		channel.setSerializer(createSerializer(type));
-			
-		// parameterize the ship strategy
-		if (channel.getShipStrategy().requiresComparator()) {
-			channel.setShipStrategyComparator(createComparator(type, channel.getShipStrategyKeys(), 
-				getSortOrders(channel.getShipStrategyKeys(), channel.getShipStrategySortOrder())));
-		}
-			
-		// parameterize the local strategy
-		if (channel.getLocalStrategy().requiresComparator()) {
-			channel.setLocalStrategyComparator(createComparator(type, channel.getLocalStrategyKeys(),
-				getSortOrders(channel.getLocalStrategyKeys(), channel.getLocalStrategySortOrder())));
-		}
-		
-		// descend to the channel's source
-		traverse(channel.getSource());
-	}
-	
-	
-	@SuppressWarnings("unchecked")
-	private static <T> TypeInformation<T> getTypeInfoFromSource(SourcePlanNode node) {
-		Operator<?> op = node.getOptimizerNode().getOperator();
-		
-		if (op instanceof GenericDataSourceBase) {
-			return ((GenericDataSourceBase<T, ?>) op).getOperatorInfo().getOutputType();
-		} else {
-			throw new RuntimeException("Wrong operator type found in post pass.");
-		}
-	}
-	
-	private <T> TypeSerializerFactory<?> createSerializer(TypeInformation<T> typeInfo) {
-		TypeSerializer<T> serializer = typeInfo.createSerializer(executionConfig);
-
-		return new RuntimeSerializerFactory<T>(serializer, typeInfo.getTypeClass());
-	}
-	
-	@SuppressWarnings("unchecked")
-	private <T> TypeComparatorFactory<?> createComparator(TypeInformation<T> typeInfo, FieldList keys, boolean[] sortOrder) {
-		
-		TypeComparator<T> comparator;
-		if (typeInfo instanceof CompositeType) {
-			comparator = ((CompositeType<T>) typeInfo).createComparator(keys.toArray(), sortOrder, 0, executionConfig);
-		}
-		else if (typeInfo instanceof AtomicType) {
-			// handle grouping of atomic types
-			comparator = ((AtomicType<T>) typeInfo).createComparator(sortOrder[0], executionConfig);
-		}
-		else {
-			throw new RuntimeException("Unrecognized type: " + typeInfo);
-		}
-
-		return new RuntimeComparatorFactory<T>(comparator);
-	}
-	
-	private static <T1 extends Tuple, T2 extends Tuple> TypePairComparatorFactory<T1,T2> createPairComparator(TypeInformation<?> typeInfo1, TypeInformation<?> typeInfo2) {
-		if (!(typeInfo1.isTupleType() || typeInfo1 instanceof PojoTypeInfo) && (typeInfo2.isTupleType() || typeInfo2 instanceof PojoTypeInfo)) {
-			throw new RuntimeException("The runtime currently supports only keyed binary operations (such as joins) on tuples and POJO types.");
-		}
-		
-//		@SuppressWarnings("unchecked")
-//		TupleTypeInfo<T1> info1 = (TupleTypeInfo<T1>) typeInfo1;
-//		@SuppressWarnings("unchecked")
-//		TupleTypeInfo<T2> info2 = (TupleTypeInfo<T2>) typeInfo2;
-		
-		return new RuntimePairComparatorFactory<T1,T2>();
-	}
-	
-	private static final boolean[] getSortOrders(FieldList keys, boolean[] orders) {
-		if (orders == null) {
-			orders = new boolean[keys.size()];
-			Arrays.fill(orders, true);
-		}
-		return orders;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/MissingFieldTypeInfoException.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/MissingFieldTypeInfoException.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/MissingFieldTypeInfoException.java
deleted file mode 100644
index b9f6bfa..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/MissingFieldTypeInfoException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.postpass;
-
-public final class MissingFieldTypeInfoException extends Exception {
-	
-	private static final long serialVersionUID = 8749941961302509358L;
-	
-	private final int fieldNumber;
-
-	public MissingFieldTypeInfoException(int fieldNumber) {
-		this.fieldNumber = fieldNumber;
-	}
-	
-	public int getFieldNumber() {
-		return fieldNumber;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/OptimizerPostPass.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/OptimizerPostPass.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/OptimizerPostPass.java
deleted file mode 100644
index ba0b7c7..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/OptimizerPostPass.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.postpass;
-
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-
-/**
- * Interface for visitors that process the optimizer's plan. Typical post processing applications are schema
- * finalization or the generation/parameterization of utilities for the actual data model.
- */
-public interface OptimizerPostPass {
-	
-	/**
-	 * Central post processing function. Invoked by the optimizer after the best plan has
-	 * been determined.
-	 * 
-	 * @param plan The plan to be post processed.
-	 */
-	void postPass(OptimizedPlan plan);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/PostPassUtils.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/PostPassUtils.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/PostPassUtils.java
deleted file mode 100644
index 1fc4c34..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/PostPassUtils.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.postpass;
-
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.types.Key;
-
-
-public class PostPassUtils {
-
-	public static <X> Class<? extends Key<?>>[] getKeys(AbstractSchema<Class< ? extends X>> schema, int[] fields) throws MissingFieldTypeInfoException {
-		@SuppressWarnings("unchecked")
-		Class<? extends Key<?>>[] keyTypes = new Class[fields.length];
-		
-		for (int i = 0; i < fields.length; i++) {
-			Class<? extends X> type = schema.getType(fields[i]);
-			if (type == null) {
-				throw new MissingFieldTypeInfoException(i);
-			} else if (Key.class.isAssignableFrom(type)) {
-				@SuppressWarnings("unchecked")
-				Class<? extends Key<?>> keyType = (Class<? extends Key<?>>) type;
-				keyTypes[i] = keyType;
-			} else {
-				throw new CompilerException("The field type " + type.getName() +
-						" cannot be used as a key because it does not implement the interface 'Key'");
-			}
-		}
-		
-		return keyTypes;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/RecordModelPostPass.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/RecordModelPostPass.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/RecordModelPostPass.java
deleted file mode 100644
index 8a2d006..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/RecordModelPostPass.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.postpass;
-
-import org.apache.flink.api.common.operators.DualInputOperator;
-import org.apache.flink.api.common.operators.GenericDataSinkBase;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.RecordOperator;
-import org.apache.flink.api.common.operators.SingleInputOperator;
-import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
-import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
-import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.CompilerPostPassException;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.types.Key;
-
-/**
- * Post pass implementation for the Record data model. Does only type inference and creates
- * serializers and comparators.
- */
-public class RecordModelPostPass extends GenericFlatTypePostPass<Class<? extends Key<?>>, SparseKeySchema> {
-	
-	// --------------------------------------------------------------------------------------------
-	//  Type specific methods that extract schema information
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	protected SparseKeySchema createEmptySchema() {
-		return new SparseKeySchema();
-	}
-	
-	@Override
-	protected void getSinkSchema(SinkPlanNode sinkPlanNode, SparseKeySchema schema) throws CompilerPostPassException {
-		GenericDataSinkBase<?> sink = sinkPlanNode.getSinkNode().getOperator();
-		Ordering partitioning = sink.getPartitionOrdering();
-		Ordering sorting = sink.getLocalOrder();
-		
-		try {
-			if (partitioning != null) {
-				addOrderingToSchema(partitioning, schema);
-			}
-			if (sorting != null) {
-				addOrderingToSchema(sorting, schema);
-			}
-		} catch (ConflictingFieldTypeInfoException ex) {
-			throw new CompilerPostPassException("Conflicting information found when adding data sink types. " +
-					"Probable reason is contradicting type infos for partitioning and sorting ordering.");
-		}
-	}
-	
-	@Override
-	protected void getSingleInputNodeSchema(SingleInputPlanNode node, SparseKeySchema schema)
-			throws CompilerPostPassException, ConflictingFieldTypeInfoException
-	{
-		// check that we got the right types
-		SingleInputOperator<?, ?, ?> contract = (SingleInputOperator<?, ?, ?>) node.getSingleInputNode().getOperator();
-		if (! (contract instanceof RecordOperator)) {
-			throw new CompilerPostPassException("Error: Operator is not a Record based contract. Wrong compiler invokation.");
-		}
-		RecordOperator recContract = (RecordOperator) contract;
-		
-		// add the information to the schema
-		int[] localPositions = contract.getKeyColumns(0);
-		Class<? extends Key<?>>[] types = recContract.getKeyClasses();
-		for (int i = 0; i < localPositions.length; i++) {
-			schema.addType(localPositions[i], types[i]);
-		}
-		
-		// this is a temporary fix, we should solve this more generic
-		if (contract instanceof GroupReduceOperatorBase) {
-			Ordering groupOrder = ((GroupReduceOperatorBase<?, ?, ?>) contract).getGroupOrder();
-			if (groupOrder != null) {
-				addOrderingToSchema(groupOrder, schema);
-			}
-		}
-	}
-	
-	@Override
-	protected void getDualInputNodeSchema(DualInputPlanNode node, SparseKeySchema input1Schema, SparseKeySchema input2Schema)
-			throws CompilerPostPassException, ConflictingFieldTypeInfoException
-	{
-		// add the nodes local information. this automatically consistency checks
-		DualInputOperator<?, ?, ?, ?> contract = node.getTwoInputNode().getOperator();
-		if (! (contract instanceof RecordOperator)) {
-			throw new CompilerPostPassException("Error: Operator is not a Pact Record based contract. Wrong compiler invokation.");
-		}
-		
-		RecordOperator recContract = (RecordOperator) contract;
-		int[] localPositions1 = contract.getKeyColumns(0);
-		int[] localPositions2 = contract.getKeyColumns(1);
-		Class<? extends Key<?>>[] types = recContract.getKeyClasses();
-		
-		if (localPositions1.length != localPositions2.length) {
-			throw new CompilerException("Error: The keys for the first and second input have a different number of fields.");
-		}
-		
-		for (int i = 0; i < localPositions1.length; i++) {
-			input1Schema.addType(localPositions1[i], types[i]);
-		}
-		for (int i = 0; i < localPositions2.length; i++) {
-			input2Schema.addType(localPositions2[i], types[i]);
-		}
-		
-		
-		// this is a temporary fix, we should solve this more generic
-		if (contract instanceof CoGroupOperatorBase) {
-			Ordering groupOrder1 = ((CoGroupOperatorBase<?, ?, ?, ?>) contract).getGroupOrderForInputOne();
-			Ordering groupOrder2 = ((CoGroupOperatorBase<?, ?, ?, ?>) contract).getGroupOrderForInputTwo();
-			
-			if (groupOrder1 != null) {
-				addOrderingToSchema(groupOrder1, input1Schema);
-			}
-			if (groupOrder2 != null) {
-				addOrderingToSchema(groupOrder2, input2Schema);
-			}
-		}
-	}
-
-	private void addOrderingToSchema(Ordering o, SparseKeySchema schema) throws ConflictingFieldTypeInfoException {
-		for (int i = 0; i < o.getNumberOfFields(); i++) {
-			Integer pos = o.getFieldNumber(i);
-			Class<? extends Key<?>> type = o.getType(i);
-			schema.addType(pos, type);
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Methods to create serializers and comparators
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	protected TypeSerializerFactory<?> createSerializer(SparseKeySchema schema) {
-		return RecordSerializerFactory.get();
-	}
-	
-	@Override
-	protected RecordComparatorFactory createComparator(FieldList fields, boolean[] directions, SparseKeySchema schema)
-			throws MissingFieldTypeInfoException
-	{
-		int[] positions = fields.toArray();
-		Class<? extends Key<?>>[] keyTypes = PostPassUtils.getKeys(schema, positions);
-		return new RecordComparatorFactory(positions, keyTypes, directions);
-	}
-	
-	@Override
-	protected RecordPairComparatorFactory createPairComparator(FieldList fields1, FieldList fields2, boolean[] sortDirections, 
-			SparseKeySchema schema1, SparseKeySchema schema2)
-	{
-		return RecordPairComparatorFactory.get();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/SparseKeySchema.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/SparseKeySchema.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/SparseKeySchema.java
deleted file mode 100644
index e14888e..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/postpass/SparseKeySchema.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.postpass;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.flink.types.Key;
-
-/**
- * Class encapsulating a schema map (int column position -> column type) and a reference counter.
- */
-public class SparseKeySchema extends AbstractSchema<Class<? extends Key<?>>> {
-	
-	private final Map<Integer, Class<? extends Key<?>>> schema;
-	
-	
-	public SparseKeySchema() {
-		this.schema = new HashMap<Integer, Class<? extends Key<?>>>();
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void addType(int key, Class<? extends Key<?>> type) throws ConflictingFieldTypeInfoException  {
-		Class<? extends Key<?>> previous = this.schema.put(key, type);
-		if (previous != null && previous != type) {
-			throw new ConflictingFieldTypeInfoException(key, previous, type);
-		}
-	}
-	
-	@Override
-	public Class<? extends Key<?>> getType(int field) {
-		return this.schema.get(field);
-	}
-	
-	@Override
-	public Iterator<Entry<Integer, Class<? extends Key<?>>>> iterator() {
-		return this.schema.entrySet().iterator();
-	}
-	
-	public int getNumTypes() {
-		return this.schema.size();
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public int hashCode() {
-		return this.schema.hashCode() ^ getNumConnectionsThatContributed();
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof SparseKeySchema) {
-			SparseKeySchema other = (SparseKeySchema) obj;
-			return this.schema.equals(other.schema) && 
-					this.getNumConnectionsThatContributed() == other.getNumConnectionsThatContributed();
-		} else {
-			return false;
-		}
-	}
-	
-	@Override
-	public String toString() {
-		return "<" + getNumConnectionsThatContributed() + "> : " + this.schema.toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/BinaryUnionReplacer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/BinaryUnionReplacer.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/BinaryUnionReplacer.java
deleted file mode 100644
index bd35b5c..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/BinaryUnionReplacer.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.traversals;
-
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.plan.BinaryUnionPlanNode;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.IterationPlanNode;
-import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.util.Visitor;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-/**
- * A traversal that collects cascading binary unions into a single n-ary
- * union operator. The exception is, when on of the union inputs is materialized, such as in the
- * static-code-path-cache in iterations.
- */
-public class BinaryUnionReplacer implements Visitor<PlanNode> {
-
-	private final Set<PlanNode> seenBefore = new HashSet<PlanNode>();
-
-	@Override
-	public boolean preVisit(PlanNode visitable) {
-		if (this.seenBefore.add(visitable)) {
-			if (visitable instanceof IterationPlanNode) {
-				((IterationPlanNode) visitable).acceptForStepFunction(this);
-			}
-			return true;
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public void postVisit(PlanNode visitable) {
-
-		if (visitable instanceof BinaryUnionPlanNode) {
-
-			final BinaryUnionPlanNode unionNode = (BinaryUnionPlanNode) visitable;
-			final Channel in1 = unionNode.getInput1();
-			final Channel in2 = unionNode.getInput2();
-
-			if (!unionNode.unionsStaticAndDynamicPath()) {
-
-				// both on static path, or both on dynamic path. we can collapse them
-				NAryUnionPlanNode newUnionNode;
-
-				List<Channel> inputs = new ArrayList<Channel>();
-				collect(in1, inputs);
-				collect(in2, inputs);
-
-				newUnionNode = new NAryUnionPlanNode(unionNode.getOptimizerNode(), inputs,
-						unionNode.getGlobalProperties(), unionNode.getCumulativeCosts());
-
-				newUnionNode.setParallelism(unionNode.getParallelism());
-
-				for (Channel c : inputs) {
-					c.setTarget(newUnionNode);
-				}
-
-				for (Channel channel : unionNode.getOutgoingChannels()) {
-					channel.swapUnionNodes(newUnionNode);
-					newUnionNode.addOutgoingChannel(channel);
-				}
-			}
-			else {
-				// union between the static and the dynamic path. we need to handle this for now
-				// through a special union operator
-
-				// make sure that the first input is the cached (static) and the second input is the dynamic
-				if (in1.isOnDynamicPath()) {
-					BinaryUnionPlanNode newUnionNode = new BinaryUnionPlanNode(unionNode);
-
-					in1.setTarget(newUnionNode);
-					in2.setTarget(newUnionNode);
-
-					for (Channel channel : unionNode.getOutgoingChannels()) {
-						channel.swapUnionNodes(newUnionNode);
-						newUnionNode.addOutgoingChannel(channel);
-					}
-				}
-			}
-		}
-	}
-
-	public void collect(Channel in, List<Channel> inputs) {
-		if (in.getSource() instanceof NAryUnionPlanNode) {
-			// sanity check
-			if (in.getShipStrategy() != ShipStrategyType.FORWARD) {
-				throw new CompilerException("Bug: Plan generation for Unions picked a ship strategy between binary plan operators.");
-			}
-			if (!(in.getLocalStrategy() == null || in.getLocalStrategy() == LocalStrategy.NONE)) {
-				throw new CompilerException("Bug: Plan generation for Unions picked a local strategy between binary plan operators.");
-			}
-
-			inputs.addAll(((NAryUnionPlanNode) in.getSource()).getListOfInputs());
-		} else {
-			// is not a collapsed union node, so we take the channel directly
-			inputs.add(in);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/BranchesVisitor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/BranchesVisitor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/BranchesVisitor.java
deleted file mode 100644
index 4730546..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/BranchesVisitor.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.traversals;
-
-import org.apache.flink.optimizer.dag.IterationNode;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.util.Visitor;
-
-/**
- * This traversal of the optimizer DAG computes the information needed to track
- * branches and joins in the data flow. This is important to support plans
- * that are not a minimally connected DAG (Such plans are not trees, but at least one node feeds its
- * output into more than one other node).
- */
-public final class BranchesVisitor implements Visitor<OptimizerNode> {
-
-	@Override
-	public boolean preVisit(OptimizerNode node) {
-		return node.getOpenBranches() == null;
-	}
-
-	@Override
-	public void postVisit(OptimizerNode node) {
-		if (node instanceof IterationNode) {
-			((IterationNode) node).acceptForStepFunction(this);
-		}
-
-		node.computeUnclosedBranchStack();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
deleted file mode 100644
index 160ef95..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
+++ /dev/null
@@ -1,392 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.traversals;
-
-import org.apache.flink.api.common.ExecutionMode;
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.operators.GenericDataSinkBase;
-import org.apache.flink.api.common.operators.GenericDataSourceBase;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.Union;
-import org.apache.flink.api.common.operators.base.BulkIterationBase;
-import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
-import org.apache.flink.api.common.operators.base.CrossOperatorBase;
-import org.apache.flink.api.common.operators.base.DeltaIterationBase;
-import org.apache.flink.api.common.operators.base.FilterOperatorBase;
-import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
-import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
-import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
-import org.apache.flink.api.common.operators.base.MapOperatorBase;
-import org.apache.flink.api.common.operators.base.MapPartitionOperatorBase;
-import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
-import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
-import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.dag.BinaryUnionNode;
-import org.apache.flink.optimizer.dag.BulkIterationNode;
-import org.apache.flink.optimizer.dag.BulkPartialSolutionNode;
-import org.apache.flink.optimizer.dag.CoGroupNode;
-import org.apache.flink.optimizer.dag.CollectorMapNode;
-import org.apache.flink.optimizer.dag.CrossNode;
-import org.apache.flink.optimizer.dag.DagConnection;
-import org.apache.flink.optimizer.dag.DataSinkNode;
-import org.apache.flink.optimizer.dag.DataSourceNode;
-import org.apache.flink.optimizer.dag.FilterNode;
-import org.apache.flink.optimizer.dag.FlatMapNode;
-import org.apache.flink.optimizer.dag.GroupCombineNode;
-import org.apache.flink.optimizer.dag.GroupReduceNode;
-import org.apache.flink.optimizer.dag.JoinNode;
-import org.apache.flink.optimizer.dag.MapNode;
-import org.apache.flink.optimizer.dag.MapPartitionNode;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.optimizer.dag.PartitionNode;
-import org.apache.flink.optimizer.dag.ReduceNode;
-import org.apache.flink.optimizer.dag.SolutionSetNode;
-import org.apache.flink.optimizer.dag.SortPartitionNode;
-import org.apache.flink.optimizer.dag.WorksetIterationNode;
-import org.apache.flink.optimizer.dag.WorksetNode;
-import org.apache.flink.util.Visitor;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This traversal creates the optimizer DAG from a program.
- * It works as a visitor that walks the program's flow in a depth-first fashion, starting from the data sinks.
- * During the descend, it creates an optimizer node for each operator, respectively data source or -sink.
- * During the ascend, it connects the nodes to the full graph.
- */
-public class GraphCreatingVisitor implements Visitor<Operator<?>> {
-
-	private final Map<Operator<?>, OptimizerNode> con2node; // map from the operator objects to their
-															// corresponding optimizer nodes
-
-	private final List<DataSinkNode> sinks; // all data sink nodes in the optimizer plan
-
-	private final int defaultParallelism; // the default degree of parallelism
-
-	private final GraphCreatingVisitor parent;	// reference to enclosing creator, in case of a recursive translation
-
-	private final ExecutionMode defaultDataExchangeMode;
-
-	private final boolean forceDOP;
-
-
-	public GraphCreatingVisitor(int defaultParallelism, ExecutionMode defaultDataExchangeMode) {
-		this(null, false, defaultParallelism, defaultDataExchangeMode, null);
-	}
-
-	private GraphCreatingVisitor(GraphCreatingVisitor parent, boolean forceDOP, int defaultParallelism,
-									ExecutionMode dataExchangeMode, HashMap<Operator<?>, OptimizerNode> closure) {
-		if (closure == null){
-			con2node = new HashMap<Operator<?>, OptimizerNode>();
-		} else {
-			con2node = closure;
-		}
-
-		this.sinks = new ArrayList<DataSinkNode>(2);
-		this.defaultParallelism = defaultParallelism;
-		this.parent = parent;
-		this.defaultDataExchangeMode = dataExchangeMode;
-		this.forceDOP = forceDOP;
-	}
-
-	public List<DataSinkNode> getSinks() {
-		return sinks;
-	}
-
-	@SuppressWarnings("deprecation")
-	@Override
-	public boolean preVisit(Operator<?> c) {
-		// check if we have been here before
-		if (this.con2node.containsKey(c)) {
-			return false;
-		}
-
-		final OptimizerNode n;
-
-		// create a node for the operator (or sink or source) if we have not been here before
-		if (c instanceof GenericDataSinkBase) {
-			DataSinkNode dsn = new DataSinkNode((GenericDataSinkBase<?>) c);
-			this.sinks.add(dsn);
-			n = dsn;
-		}
-		else if (c instanceof GenericDataSourceBase) {
-			n = new DataSourceNode((GenericDataSourceBase<?, ?>) c);
-		}
-		else if (c instanceof MapOperatorBase) {
-			n = new MapNode((MapOperatorBase<?, ?, ?>) c);
-		}
-		else if (c instanceof MapPartitionOperatorBase) {
-			n = new MapPartitionNode((MapPartitionOperatorBase<?, ?, ?>) c);
-		}
-		else if (c instanceof org.apache.flink.api.common.operators.base.CollectorMapOperatorBase) {
-			n = new CollectorMapNode((org.apache.flink.api.common.operators.base.CollectorMapOperatorBase<?, ?, ?>) c);
-		}
-		else if (c instanceof FlatMapOperatorBase) {
-			n = new FlatMapNode((FlatMapOperatorBase<?, ?, ?>) c);
-		}
-		else if (c instanceof FilterOperatorBase) {
-			n = new FilterNode((FilterOperatorBase<?, ?>) c);
-		}
-		else if (c instanceof ReduceOperatorBase) {
-			n = new ReduceNode((ReduceOperatorBase<?, ?>) c);
-		}
-		else if (c instanceof GroupCombineOperatorBase) {
-			n = new GroupCombineNode((GroupCombineOperatorBase<?, ?, ?>) c);
-		}
-		else if (c instanceof GroupReduceOperatorBase) {
-			n = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) c);
-		}
-		else if (c instanceof JoinOperatorBase) {
-			n = new JoinNode((JoinOperatorBase<?, ?, ?, ?>) c);
-		}
-		else if (c instanceof CoGroupOperatorBase) {
-			n = new CoGroupNode((CoGroupOperatorBase<?, ?, ?, ?>) c);
-		}
-		else if (c instanceof CrossOperatorBase) {
-			n = new CrossNode((CrossOperatorBase<?, ?, ?, ?>) c);
-		}
-		else if (c instanceof BulkIterationBase) {
-			n = new BulkIterationNode((BulkIterationBase<?>) c);
-		}
-		else if (c instanceof DeltaIterationBase) {
-			n = new WorksetIterationNode((DeltaIterationBase<?, ?>) c);
-		}
-		else if (c instanceof Union){
-			n = new BinaryUnionNode((Union<?>) c);
-		}
-		else if (c instanceof PartitionOperatorBase) {
-			n = new PartitionNode((PartitionOperatorBase<?>) c);
-		}
-		else if (c instanceof SortPartitionOperatorBase) {
-			n = new SortPartitionNode((SortPartitionOperatorBase<?>) c);
-		}
-		else if (c instanceof BulkIterationBase.PartialSolutionPlaceHolder) {
-			if (this.parent == null) {
-				throw new InvalidProgramException("It is currently not supported to create data sinks inside iterations.");
-			}
-
-			final BulkIterationBase.PartialSolutionPlaceHolder<?> holder = (BulkIterationBase.PartialSolutionPlaceHolder<?>) c;
-			final BulkIterationBase<?> enclosingIteration = holder.getContainingBulkIteration();
-			final BulkIterationNode containingIterationNode =
-						(BulkIterationNode) this.parent.con2node.get(enclosingIteration);
-
-			// catch this for the recursive translation of step functions
-			BulkPartialSolutionNode p = new BulkPartialSolutionNode(holder, containingIterationNode);
-			p.setDegreeOfParallelism(containingIterationNode.getParallelism());
-			n = p;
-		}
-		else if (c instanceof DeltaIterationBase.WorksetPlaceHolder) {
-			if (this.parent == null) {
-				throw new InvalidProgramException("It is currently not supported to create data sinks inside iterations.");
-			}
-
-			final DeltaIterationBase.WorksetPlaceHolder<?> holder = (DeltaIterationBase.WorksetPlaceHolder<?>) c;
-			final DeltaIterationBase<?, ?> enclosingIteration = holder.getContainingWorksetIteration();
-			final WorksetIterationNode containingIterationNode =
-						(WorksetIterationNode) this.parent.con2node.get(enclosingIteration);
-
-			// catch this for the recursive translation of step functions
-			WorksetNode p = new WorksetNode(holder, containingIterationNode);
-			p.setDegreeOfParallelism(containingIterationNode.getParallelism());
-			n = p;
-		}
-		else if (c instanceof DeltaIterationBase.SolutionSetPlaceHolder) {
-			if (this.parent == null) {
-				throw new InvalidProgramException("It is currently not supported to create data sinks inside iterations.");
-			}
-
-			final DeltaIterationBase.SolutionSetPlaceHolder<?> holder = (DeltaIterationBase.SolutionSetPlaceHolder<?>) c;
-			final DeltaIterationBase<?, ?> enclosingIteration = holder.getContainingWorksetIteration();
-			final WorksetIterationNode containingIterationNode =
-						(WorksetIterationNode) this.parent.con2node.get(enclosingIteration);
-
-			// catch this for the recursive translation of step functions
-			SolutionSetNode p = new SolutionSetNode(holder, containingIterationNode);
-			p.setDegreeOfParallelism(containingIterationNode.getParallelism());
-			n = p;
-		}
-		else {
-			throw new IllegalArgumentException("Unknown operator type: " + c);
-		}
-
-		this.con2node.put(c, n);
-
-		// set the parallelism only if it has not been set before. some nodes have a fixed DOP, such as the
-		// key-less reducer (all-reduce)
-		if (n.getParallelism() < 1) {
-			// set the degree of parallelism
-			int par = c.getDegreeOfParallelism();
-			if (par > 0) {
-				if (this.forceDOP && par != this.defaultParallelism) {
-					par = this.defaultParallelism;
-					Optimizer.LOG.warn("The parallelism of nested dataflows (such as step functions in iterations) is " +
-						"currently fixed to the parallelism of the surrounding operator (the iteration).");
-				}
-			} else {
-				par = this.defaultParallelism;
-			}
-			n.setDegreeOfParallelism(par);
-		}
-
-		return true;
-	}
-
-	@Override
-	public void postVisit(Operator<?> c) {
-
-		OptimizerNode n = this.con2node.get(c);
-
-		// first connect to the predecessors
-		n.setInput(this.con2node, this.defaultDataExchangeMode);
-		n.setBroadcastInputs(this.con2node, this.defaultDataExchangeMode);
-
-		// if the node represents a bulk iteration, we recursively translate the data flow now
-		if (n instanceof BulkIterationNode) {
-			final BulkIterationNode iterNode = (BulkIterationNode) n;
-			final BulkIterationBase<?> iter = iterNode.getIterationContract();
-
-			// pass a copy of the no iterative part into the iteration translation,
-			// in case the iteration references its closure
-			HashMap<Operator<?>, OptimizerNode> closure = new HashMap<Operator<?>, OptimizerNode>(con2node);
-
-			// first, recursively build the data flow for the step function
-			final GraphCreatingVisitor recursiveCreator = new GraphCreatingVisitor(this, true,
-				iterNode.getParallelism(), defaultDataExchangeMode, closure);
-
-			BulkPartialSolutionNode partialSolution;
-
-			iter.getNextPartialSolution().accept(recursiveCreator);
-
-			partialSolution =  (BulkPartialSolutionNode) recursiveCreator.con2node.get(iter.getPartialSolution());
-			OptimizerNode rootOfStepFunction = recursiveCreator.con2node.get(iter.getNextPartialSolution());
-			if (partialSolution == null) {
-				throw new CompilerException("Error: The step functions result does not depend on the partial solution.");
-			}
-
-
-			OptimizerNode terminationCriterion = null;
-
-			if (iter.getTerminationCriterion() != null) {
-				terminationCriterion = recursiveCreator.con2node.get(iter.getTerminationCriterion());
-
-				// no intermediate node yet, traverse from the termination criterion to build the missing parts
-				if (terminationCriterion == null) {
-					iter.getTerminationCriterion().accept(recursiveCreator);
-					terminationCriterion = recursiveCreator.con2node.get(iter.getTerminationCriterion());
-				}
-			}
-
-			iterNode.setPartialSolution(partialSolution);
-			iterNode.setNextPartialSolution(rootOfStepFunction, terminationCriterion);
-
-			// go over the contained data flow and mark the dynamic path nodes
-			StaticDynamicPathIdentifier identifier = new StaticDynamicPathIdentifier(iterNode.getCostWeight());
-			iterNode.acceptForStepFunction(identifier);
-		}
-		else if (n instanceof WorksetIterationNode) {
-			final WorksetIterationNode iterNode = (WorksetIterationNode) n;
-			final DeltaIterationBase<?, ?> iter = iterNode.getIterationContract();
-
-			// we need to ensure that both the next-workset and the solution-set-delta depend on the workset.
-			// One check is for free during the translation, we do the other check here as a pre-condition
-			{
-				StepFunctionValidator wsf = new StepFunctionValidator();
-				iter.getNextWorkset().accept(wsf);
-				if (!wsf.hasFoundWorkset()) {
-					throw new CompilerException("In the given program, the next workset does not depend on the workset. " +
-														"This is a prerequisite in delta iterations.");
-				}
-			}
-
-			// calculate the closure of the anonymous function
-			HashMap<Operator<?>, OptimizerNode> closure = new HashMap<Operator<?>, OptimizerNode>(con2node);
-
-			// first, recursively build the data flow for the step function
-			final GraphCreatingVisitor recursiveCreator = new GraphCreatingVisitor(
-					this, true, iterNode.getParallelism(), defaultDataExchangeMode, closure);
-
-			// descend from the solution set delta. check that it depends on both the workset
-			// and the solution set. If it does depend on both, this descend should create both nodes
-			iter.getSolutionSetDelta().accept(recursiveCreator);
-
-			final WorksetNode worksetNode = (WorksetNode) recursiveCreator.con2node.get(iter.getWorkset());
-
-			if (worksetNode == null) {
-				throw new CompilerException("In the given program, the solution set delta does not depend on the workset." +
-													"This is a prerequisite in delta iterations.");
-			}
-
-			iter.getNextWorkset().accept(recursiveCreator);
-
-			SolutionSetNode solutionSetNode = (SolutionSetNode) recursiveCreator.con2node.get(iter.getSolutionSet());
-
-			if (solutionSetNode == null || solutionSetNode.getOutgoingConnections() == null || solutionSetNode.getOutgoingConnections().isEmpty()) {
-				solutionSetNode = new SolutionSetNode((DeltaIterationBase.SolutionSetPlaceHolder<?>) iter.getSolutionSet(), iterNode);
-			}
-			else {
-				for (DagConnection conn : solutionSetNode.getOutgoingConnections()) {
-					OptimizerNode successor = conn.getTarget();
-
-					if (successor.getClass() == JoinNode.class) {
-						// find out which input to the match the solution set is
-						JoinNode mn = (JoinNode) successor;
-						if (mn.getFirstPredecessorNode() == solutionSetNode) {
-							mn.makeJoinWithSolutionSet(0);
-						} else if (mn.getSecondPredecessorNode() == solutionSetNode) {
-							mn.makeJoinWithSolutionSet(1);
-						} else {
-							throw new CompilerException();
-						}
-					}
-					else if (successor.getClass() == CoGroupNode.class) {
-						CoGroupNode cg = (CoGroupNode) successor;
-						if (cg.getFirstPredecessorNode() == solutionSetNode) {
-							cg.makeCoGroupWithSolutionSet(0);
-						} else if (cg.getSecondPredecessorNode() == solutionSetNode) {
-							cg.makeCoGroupWithSolutionSet(1);
-						} else {
-							throw new CompilerException();
-						}
-					}
-					else {
-						throw new InvalidProgramException(
-								"Error: The only operations allowed on the solution set are Join and CoGroup.");
-					}
-				}
-			}
-
-			final OptimizerNode nextWorksetNode = recursiveCreator.con2node.get(iter.getNextWorkset());
-			final OptimizerNode solutionSetDeltaNode = recursiveCreator.con2node.get(iter.getSolutionSetDelta());
-
-			// set the step function nodes to the iteration node
-			iterNode.setPartialSolution(solutionSetNode, worksetNode);
-			iterNode.setNextPartialSolution(solutionSetDeltaNode, nextWorksetNode, defaultDataExchangeMode);
-
-			// go over the contained data flow and mark the dynamic path nodes
-			StaticDynamicPathIdentifier pathIdentifier = new StaticDynamicPathIdentifier(iterNode.getCostWeight());
-			iterNode.acceptForStepFunction(pathIdentifier);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/IdAndEstimatesVisitor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/IdAndEstimatesVisitor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/IdAndEstimatesVisitor.java
deleted file mode 100644
index b5c09e5..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/IdAndEstimatesVisitor.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.traversals;
-
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.dag.DagConnection;
-import org.apache.flink.optimizer.dag.IterationNode;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.util.Visitor;
-
-/**
- * This traversal of the optimizer DAG assigns IDs to each node (in a pre-order fashion),
- * and calls each node to compute its estimates. The latter happens in the postVisit function,
- * where it is guaranteed that all predecessors have computed their estimates.
- */
-public class IdAndEstimatesVisitor implements Visitor<OptimizerNode> {
-
-	private final DataStatistics statistics;
-
-	private int id = 1;
-
-	public IdAndEstimatesVisitor(DataStatistics statistics) {
-		this.statistics = statistics;
-	}
-
-	@Override
-	public boolean preVisit(OptimizerNode visitable) {
-		return visitable.getId() == -1;
-	}
-
-	@Override
-	public void postVisit(OptimizerNode visitable) {
-		// the node ids
-		visitable.initId(this.id++);
-
-		// connections need to figure out their maximum path depths
-		for (DagConnection conn : visitable.getIncomingConnections()) {
-			conn.initMaxDepth();
-		}
-		for (DagConnection conn : visitable.getBroadcastConnections()) {
-			conn.initMaxDepth();
-		}
-
-		// the estimates
-		visitable.computeOutputEstimates(this.statistics);
-
-		// if required, recurse into the step function
-		if (visitable instanceof IterationNode) {
-			((IterationNode) visitable).acceptForStepFunction(this);
-		}
-	}
-}


Mime
View raw message