flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [6/8] flink git commit: [FLINK-1671] [optimizer] Add data exchange mode to optimizer classes
Date Tue, 17 Mar 2015 09:16:26 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/1c50d87c/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
index 31e13ae..7670277 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
@@ -21,6 +21,7 @@ package org.apache.flink.compiler.dataproperties;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.flink.api.common.ExecutionMode;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
@@ -30,16 +31,15 @@ import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.plan.Channel;
 import org.apache.flink.compiler.util.Utils;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * This class represents global properties of the data at a certain point in the plan.
- * Global properties are properties that describe data across different partitions.
- * <p>
- * Currently, the properties are the following: A partitioning type (ANY, HASH, RANGE), and EITHER an ordering (for range partitioning)
- * or an FieldSet with the hash partitioning columns.
+ * Global properties are properties that describe data across different partitions, such as
+ * whether the data is hash partitioned, range partitioned, replicated, etc.
  */
 public class GlobalProperties implements Cloneable {
 
@@ -67,9 +67,9 @@ public class GlobalProperties implements Cloneable {
 	// --------------------------------------------------------------------------------------------
 	
 	/**
-	 * Sets the partitioning property for the global properties.
+	 * Sets this global properties to represent a hash partitioning.
 	 * 
-	 * @param partitionedFields 
+	 * @param partitionedFields The key fields on which the data is hash partitioned.
 	 */
 	public void setHashPartitioned(FieldList partitionedFields) {
 		if (partitionedFields == null) {
@@ -355,30 +355,64 @@ public class GlobalProperties implements Cloneable {
 	}
 
 
-	public void parameterizeChannel(Channel channel, boolean globalDopChange) {
+	public void parameterizeChannel(Channel channel, boolean globalDopChange,
+									ExecutionMode exchangeMode, boolean breakPipeline) {
+
+		ShipStrategyType shipType;
+		FieldList partitionKeys;
+		boolean[] sortDirection;
+		Partitioner<?> partitioner;
+
 		switch (this.partitioning) {
 			case RANDOM_PARTITIONED:
-				channel.setShipStrategy(globalDopChange ? ShipStrategyType.PARTITION_RANDOM : ShipStrategyType.FORWARD);
+				shipType = globalDopChange ? ShipStrategyType.PARTITION_RANDOM : ShipStrategyType.FORWARD;
+				partitionKeys = null;
+				sortDirection = null;
+				partitioner = null;
 				break;
+
 			case FULL_REPLICATION:
-				channel.setShipStrategy(ShipStrategyType.BROADCAST);
+				shipType = ShipStrategyType.BROADCAST;
+				partitionKeys = null;
+				sortDirection = null;
+				partitioner = null;
 				break;
+
 			case ANY_PARTITIONING:
 			case HASH_PARTITIONED:
-				channel.setShipStrategy(ShipStrategyType.PARTITION_HASH, Utils.createOrderedFromSet(this.partitioningFields));
+				shipType = ShipStrategyType.PARTITION_HASH;
+				partitionKeys = Utils.createOrderedFromSet(this.partitioningFields);
+				sortDirection = null;
+				partitioner = null;
 				break;
+
 			case RANGE_PARTITIONED:
-				channel.setShipStrategy(ShipStrategyType.PARTITION_RANGE, this.ordering.getInvolvedIndexes(), this.ordering.getFieldSortDirections());
+				shipType = ShipStrategyType.PARTITION_RANGE;
+				partitionKeys = this.ordering.getInvolvedIndexes();
+				sortDirection = this.ordering.getFieldSortDirections();
+				partitioner = null;
 				break;
+
 			case FORCED_REBALANCED:
-				channel.setShipStrategy(ShipStrategyType.PARTITION_RANDOM);
+				shipType = ShipStrategyType.PARTITION_RANDOM;
+				partitionKeys = null;
+				sortDirection = null;
+				partitioner = null;
 				break;
+
 			case CUSTOM_PARTITIONING:
-				channel.setShipStrategy(ShipStrategyType.PARTITION_CUSTOM, this.partitioningFields, this.customPartitioner);
+				shipType = ShipStrategyType.PARTITION_CUSTOM;
+				partitionKeys = this.partitioningFields;
+				sortDirection = null;
+				partitioner = this.customPartitioner;
 				break;
+
 			default:
 				throw new CompilerException("Unsupported partitioning strategy");
 		}
+
+		DataExchangeMode exMode = DataExchangeMode.select(exchangeMode, shipType, breakPipeline);
+		channel.setShipStrategy(shipType, partitionKeys, sortDirection, partitioner, exMode);
 	}
 
 	// ------------------------------------------------------------------------
@@ -438,7 +472,7 @@ public class GlobalProperties implements Cloneable {
 	
 	// --------------------------------------------------------------------------------------------
 	
-	public static final GlobalProperties combine(GlobalProperties gp1, GlobalProperties gp2) {
+	public static GlobalProperties combine(GlobalProperties gp1, GlobalProperties gp2) {
 		if (gp1.isFullyReplicated()) {
 			if (gp2.isFullyReplicated()) {
 				return new GlobalProperties();
@@ -448,7 +482,7 @@ public class GlobalProperties implements Cloneable {
 		} else if (gp2.isFullyReplicated()) {
 			return gp1;
 		} else if (gp1.ordering != null) {
-			return gp1; 
+			return gp1;
 		} else if (gp2.ordering != null) {
 			return gp2;
 		} else if (gp1.partitioningFields != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1c50d87c/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
index f4334ff..10c1248 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.compiler.dataproperties;
 
+import org.apache.flink.api.common.ExecutionMode;
 import org.apache.flink.api.common.distributions.DataDistribution;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Ordering;
@@ -27,14 +28,20 @@ import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.plan.Channel;
 import org.apache.flink.compiler.util.Utils;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 
 /**
- * This class represents global properties of the data that an operator is interested in, because it needs those
- * properties for its contract.
- * <p>
- * Currently, the properties are the following: A partitioning type (ANY, HASH, RANGE), and EITHER an ordering (for range partitioning)
- * or an FieldSet with the hash partitioning columns.
+ * This class represents the global properties of the data that are requested by an operator.
+ * Operators request the global properties they need for correct execution. This list is an example of global
+ * properties requested by certain operators:
+ * <ul>
+ *     <li>"groupBy/reduce" will request the data to be partitioned in some way after the key fields.</li>
+ *     <li>"map" will request the data to be in an arbitrary distribution - it has no prerequisites</li>
+ *     <li>"join" will request certain properties for each input. This class represents the properties
+ *         on an input alone. The properties may be partitioning on the key fields, or a combination of
+ *         replication on one input and anything-but-replication on the other input.</li>
+ * </ul>
  */
 public final class RequestedGlobalProperties implements Cloneable {
 	
@@ -60,11 +67,13 @@ public final class RequestedGlobalProperties implements Cloneable {
 	// --------------------------------------------------------------------------------------------
 	
 	/**
-	 * Sets the partitioning property for the global properties.
-	 * If the partitionFields are provided as {@link FieldSet} also subsets are valid,
-	 * if provided as {@link FieldList} partitioning fields must exactly match incl. order.
+	 * Sets these properties to request a hash partitioning on the given fields.
 	 *
-	 * @param partitionedFields
+	 * If the fields are provided as {@link FieldSet}, then any permutation of the fields is a
+	 * valid partitioning, including subsets. If the fields are given as a {@link FieldList},
+	 * then only an exact partitioning on the fields matches this requested partitioning.
+	 *
+	 * @param partitionedFields The key fields for the partitioning.
 	 */
 	public void setHashPartitioned(FieldSet partitionedFields) {
 		if (partitionedFields == null) {
@@ -91,11 +100,14 @@ public final class RequestedGlobalProperties implements Cloneable {
 	}
 
 	/**
-	 * Sets the partitioning property for the global properties.
-	 * If the partitionFields are provided as {@link FieldSet} also subsets are valid,
-	 * if provided as {@link FieldList} partitioning fields must exactly match incl. order.
+	 * Sets these properties to request some partitioning on the given fields. This will allow
+	 * both hash partitioning and range partitioning to match.
+	 *
+	 * If the fields are provided as {@link FieldSet}, then any permutation of the fields is a
+	 * valid partitioning, including subsets. If the fields are given as a {@link FieldList},
+	 * then only an exact partitioning on the fields matches this requested partitioning.
 	 *
-	 * @param partitionedFields
+	 * @param partitionedFields The key fields for the partitioning.
 	 */
 	public void setAnyPartitioning(FieldSet partitionedFields) {
 		if (partitionedFields == null) {
@@ -131,11 +143,13 @@ public final class RequestedGlobalProperties implements Cloneable {
 	}
 
 	/**
-	 * Sets the partitioning property for the global properties.
-	 * If the partitionFields are provided as {@link FieldSet} also subsets are valid,
-	 * if provided as {@link FieldList} partitioning fields must exactly match incl. order.
+	 * Sets these properties to request a custom partitioning with the given {@link Partitioner} instance.
 	 *
-	 * @param partitionedFields
+	 * If the fields are provided as {@link FieldSet}, then any permutation of the fields is a
+	 * valid partitioning, including subsets. If the fields are given as a {@link FieldList},
+	 * then only an exact partitioning on the fields matches this requested partitioning.
+	 *
+	 * @param partitionedFields The key fields for the partitioning.
 	 */
 	public void setCustomPartitioned(FieldSet partitionedFields, Partitioner<?> partitioner) {
 		if (partitionedFields == null || partitioner == null) {
@@ -322,63 +336,102 @@ public final class RequestedGlobalProperties implements Cloneable {
 	}
 
 	/**
-	 * Parameterizes the ship strategy fields of a channel such that the channel produces the desired global properties.
+	 * Parametrizes the ship strategy fields of a channel such that the channel produces
+	 * the desired global properties.
 	 * 
-	 * @param channel The channel to parameterize.
-	 * @param globalDopChange
+	 * @param channel The channel to parametrize.
+	 * @param globalDopChange Flag indicating whether the degree of parallelism changes
+	 *                        between sender and receiver.
+	 * @param exchangeMode The mode of data exchange (pipelined, always batch,
+	 *                     batch only on shuffle, ...)
+	 * @param breakPipeline Indicates whether this data exchange should break
+	 *                      pipelines (unless pipelines are forced).
 	 */
-	public void parameterizeChannel(Channel channel, boolean globalDopChange) {
+	public void parameterizeChannel(Channel channel, boolean globalDopChange,
+									ExecutionMode exchangeMode, boolean breakPipeline) {
 
 		// safety check. Fully replicated input must be preserved.
-		if(channel.getSource().getGlobalProperties().isFullyReplicated() &&
+		if (channel.getSource().getGlobalProperties().isFullyReplicated() &&
 				!(this.partitioning == PartitioningProperty.FULL_REPLICATION ||
-					this.partitioning == PartitioningProperty.ANY_DISTRIBUTION)) {
-			throw new CompilerException("Fully replicated input must be preserved and may not be converted into another global property.");
+					this.partitioning == PartitioningProperty.ANY_DISTRIBUTION))
+		{
+			throw new CompilerException("Fully replicated input must be preserved " +
+					"and may not be converted into another global property.");
 		}
 
 		// if we request nothing, then we need no special strategy. forward, if the number of instances remains
 		// the same, randomly repartition otherwise
 		if (isTrivial() || this.partitioning == PartitioningProperty.ANY_DISTRIBUTION) {
-			channel.setShipStrategy(globalDopChange ? ShipStrategyType.PARTITION_RANDOM : ShipStrategyType.FORWARD);
+			ShipStrategyType shipStrategy = globalDopChange ? ShipStrategyType.PARTITION_RANDOM :
+																ShipStrategyType.FORWARD;
+
+			DataExchangeMode em = DataExchangeMode.select(exchangeMode, shipStrategy, breakPipeline);
+			channel.setShipStrategy(shipStrategy, em);
 			return;
 		}
 		
 		final GlobalProperties inGlobals = channel.getSource().getGlobalProperties();
 		// if we have no global parallelism change, check if we have already compatible global properties
 		if (!globalDopChange && isMetBy(inGlobals)) {
-			channel.setShipStrategy(ShipStrategyType.FORWARD);
+			DataExchangeMode em = DataExchangeMode.select(exchangeMode, ShipStrategyType.FORWARD, breakPipeline);
+			channel.setShipStrategy(ShipStrategyType.FORWARD, em);
 			return;
 		}
 		
 		// if we fall through the conditions until here, we need to re-establish
+		ShipStrategyType shipType;
+		FieldList partitionKeys;
+		boolean[] sortDirection;
+		Partitioner<?> partitioner;
+
 		switch (this.partitioning) {
 			case FULL_REPLICATION:
-				channel.setShipStrategy(ShipStrategyType.BROADCAST);
+				shipType = ShipStrategyType.BROADCAST;
+				partitionKeys = null;
+				sortDirection = null;
+				partitioner = null;
 				break;
-			
+
 			case ANY_PARTITIONING:
 			case HASH_PARTITIONED:
-				channel.setShipStrategy(ShipStrategyType.PARTITION_HASH, Utils.createOrderedFromSet(this.partitioningFields));
+				shipType = ShipStrategyType.PARTITION_HASH;
+				partitionKeys = Utils.createOrderedFromSet(this.partitioningFields);
+				sortDirection = null;
+				partitioner = null;
 				break;
 			
 			case RANGE_PARTITIONED:
-				channel.setShipStrategy(ShipStrategyType.PARTITION_RANGE, this.ordering.getInvolvedIndexes(), this.ordering.getFieldSortDirections());
-				if(this.dataDistribution != null) {
+				shipType = ShipStrategyType.PARTITION_RANGE;
+				partitionKeys = this.ordering.getInvolvedIndexes();
+				sortDirection = this.ordering.getFieldSortDirections();
+				partitioner = null;
+
+				if (this.dataDistribution != null) {
 					channel.setDataDistribution(this.dataDistribution);
 				}
 				break;
-			
+
 			case FORCED_REBALANCED:
-				channel.setShipStrategy(ShipStrategyType.PARTITION_FORCED_REBALANCE);
+				shipType = ShipStrategyType.PARTITION_FORCED_REBALANCE;
+				partitionKeys = null;
+				sortDirection = null;
+				partitioner = null;
 				break;
-				
+
 			case CUSTOM_PARTITIONING:
-				channel.setShipStrategy(ShipStrategyType.PARTITION_CUSTOM, Utils.createOrderedFromSet(this.partitioningFields), this.customPartitioner);
+				shipType = ShipStrategyType.PARTITION_CUSTOM;
+				partitionKeys = Utils.createOrderedFromSet(this.partitioningFields);
+				sortDirection = null;
+				partitioner = this.customPartitioner;
 				break;
-				
+
 			default:
-				throw new CompilerException("Invalid partitioning to create through a data exchange: " + this.partitioning.name());
+				throw new CompilerException("Invalid partitioning to create through a data exchange: "
+											+ this.partitioning.name());
 		}
+
+		DataExchangeMode exMode = DataExchangeMode.select(exchangeMode, shipType, breakPipeline);
+		channel.setShipStrategy(shipType, partitionKeys, sortDirection, partitioner, exMode);
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/1c50d87c/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupWithPartialPreGroupProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupWithPartialPreGroupProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupWithPartialPreGroupProperties.java
index 54885a7..ec38b47 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupWithPartialPreGroupProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupWithPartialPreGroupProperties.java
@@ -31,6 +31,7 @@ import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties;
 import org.apache.flink.compiler.dataproperties.RequestedLocalProperties;
 import org.apache.flink.compiler.plan.Channel;
 import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 
@@ -45,24 +46,29 @@ public final class AllGroupWithPartialPreGroupProperties extends OperatorDescrip
 	public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
 		if (in.getShipStrategy() == ShipStrategyType.FORWARD) {
 			// locally connected, directly instantiate
-			return new SingleInputPlanNode(node, "GroupReduce ("+node.getPactContract().getName()+")", in, DriverStrategy.ALL_GROUP_REDUCE);
+			return new SingleInputPlanNode(node, "GroupReduce ("+node.getPactContract().getName()+")",
+											in, DriverStrategy.ALL_GROUP_REDUCE);
 		} else {
 			// non forward case.plug in a combiner
 			Channel toCombiner = new Channel(in.getSource());
-			toCombiner.setShipStrategy(ShipStrategyType.FORWARD);
+			toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 			
 			// create an input node for combine with same DOP as input node
 			GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode();
 			combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
 
-			SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine ("+node.getPactContract().getName()+")", toCombiner, DriverStrategy.ALL_GROUP_COMBINE);
+			SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode,
+					"Combine ("+node.getPactContract().getName()+")", toCombiner, DriverStrategy.ALL_GROUP_COMBINE);
 			combiner.setCosts(new Costs(0, 0));
 			combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties());
 			
 			Channel toReducer = new Channel(combiner);
-			toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(), in.getShipStrategySortOrder());
+			toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(),
+										in.getShipStrategySortOrder(), in.getDataExchangeMode());
+
 			toReducer.setLocalStrategy(in.getLocalStrategy(), in.getLocalStrategyKeys(), in.getLocalStrategySortOrder());
-			return new SingleInputPlanNode(node, "GroupReduce ("+node.getPactContract().getName()+")", toReducer, DriverStrategy.ALL_GROUP_REDUCE);
+			return new SingleInputPlanNode(node, "GroupReduce ("+node.getPactContract().getName()+")",
+											toReducer, DriverStrategy.ALL_GROUP_REDUCE);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1c50d87c/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllReduceProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllReduceProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllReduceProperties.java
index 2bf757e..17fa318 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllReduceProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllReduceProperties.java
@@ -30,11 +30,11 @@ import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties;
 import org.apache.flink.compiler.dataproperties.RequestedLocalProperties;
 import org.apache.flink.compiler.plan.Channel;
 import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 
-public final class AllReduceProperties extends OperatorDescriptorSingle
-{
+public final class AllReduceProperties extends OperatorDescriptorSingle {
 
 	@Override
 	public DriverStrategy getStrategy() {
@@ -45,24 +45,30 @@ public final class AllReduceProperties extends OperatorDescriptorSingle
 	public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
 		if (in.getShipStrategy() == ShipStrategyType.FORWARD) {
 			// locally connected, directly instantiate
-			return new SingleInputPlanNode(node, "Reduce ("+node.getPactContract().getName()+")", in, DriverStrategy.ALL_REDUCE);
+			return new SingleInputPlanNode(node, "Reduce ("+node.getPactContract().getName()+")",
+											in, DriverStrategy.ALL_REDUCE);
 		} else {
 			// non forward case.plug in a combiner
 			Channel toCombiner = new Channel(in.getSource());
-			toCombiner.setShipStrategy(ShipStrategyType.FORWARD);
+			toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 			
 			// create an input node for combine with same DOP as input node
 			ReduceNode combinerNode = ((ReduceNode) node).getCombinerUtilityNode();
 			combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
 
-			SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine ("+node.getPactContract().getName()+")", toCombiner, DriverStrategy.ALL_REDUCE);
+			SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode,
+					"Combine ("+node.getPactContract().getName()+")", toCombiner, DriverStrategy.ALL_REDUCE);
 			combiner.setCosts(new Costs(0, 0));
 			combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties());
 			
 			Channel toReducer = new Channel(combiner);
-			toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(), in.getShipStrategySortOrder());
-			toReducer.setLocalStrategy(in.getLocalStrategy(), in.getLocalStrategyKeys(), in.getLocalStrategySortOrder());
-			return new SingleInputPlanNode(node, "Reduce("+node.getPactContract().getName()+")", toReducer, DriverStrategy.ALL_REDUCE);
+			toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(),
+										in.getShipStrategySortOrder(), in.getDataExchangeMode());
+			toReducer.setLocalStrategy(in.getLocalStrategy(), in.getLocalStrategyKeys(),
+										in.getLocalStrategySortOrder());
+
+			return new SingleInputPlanNode(node, "Reduce ("+node.getPactContract().getName()+")",
+											toReducer, DriverStrategy.ALL_REDUCE);
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/1c50d87c/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java
index fd263e6..7180845 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java
@@ -35,6 +35,7 @@ import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties;
 import org.apache.flink.compiler.dataproperties.RequestedLocalProperties;
 import org.apache.flink.compiler.plan.Channel;
 import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
@@ -94,13 +95,16 @@ public final class GroupReduceWithCombineProperties extends OperatorDescriptorSi
 				if (!in.getLocalStrategyKeys().isValidUnorderedPrefix(this.keys)) {
 					throw new RuntimeException("Bug: Inconsistent sort for group strategy.");
 				}
-				in.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(), in.getLocalStrategySortOrder());
+				in.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(),
+									in.getLocalStrategySortOrder());
 			}
-			return new SingleInputPlanNode(node, "Reduce("+node.getPactContract().getName()+")", in, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
+			return new SingleInputPlanNode(node, "Reduce("+node.getPactContract().getName()+")", in,
+											DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
 		} else {
 			// non forward case. all local properties are killed anyways, so we can safely plug in a combiner
 			Channel toCombiner = new Channel(in.getSource());
-			toCombiner.setShipStrategy(ShipStrategyType.FORWARD);
+			toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
+
 			// create an input node for combine with same DOP as input node
 			GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode();
 			combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
@@ -115,9 +119,13 @@ public final class GroupReduceWithCombineProperties extends OperatorDescriptorSi
 			combiner.setDriverKeyInfo(this.keyList, 1);
 			
 			Channel toReducer = new Channel(combiner);
-			toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(), in.getShipStrategySortOrder());
-			toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(), in.getLocalStrategySortOrder());
-			return new SingleInputPlanNode(node, "Reduce ("+node.getPactContract().getName()+")", toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
+			toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(),
+									in.getShipStrategySortOrder(), in.getDataExchangeMode());
+			toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(),
+										in.getLocalStrategySortOrder());
+
+			return new SingleInputPlanNode(node, "Reduce ("+node.getPactContract().getName()+")",
+											toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1c50d87c/flink-compiler/src/main/java/org/apache/flink/compiler/operators/ReduceProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/ReduceProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/ReduceProperties.java
index 000079d..3a054ff 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/ReduceProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/ReduceProperties.java
@@ -33,6 +33,7 @@ import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties;
 import org.apache.flink.compiler.dataproperties.RequestedLocalProperties;
 import org.apache.flink.compiler.plan.Channel;
 import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
@@ -60,25 +61,32 @@ public final class ReduceProperties extends OperatorDescriptorSingle {
 		if (in.getShipStrategy() == ShipStrategyType.FORWARD ||
 				(node.getBroadcastConnections() != null && !node.getBroadcastConnections().isEmpty()))
 		{
-			return new SingleInputPlanNode(node, "Reduce ("+node.getPactContract().getName()+")", in, DriverStrategy.SORTED_REDUCE, this.keyList);
+			return new SingleInputPlanNode(node, "Reduce ("+node.getPactContract().getName()+")", in,
+											DriverStrategy.SORTED_REDUCE, this.keyList);
 		}
 		else {
 			// non forward case. all local properties are killed anyways, so we can safely plug in a combiner
 			Channel toCombiner = new Channel(in.getSource());
-			toCombiner.setShipStrategy(ShipStrategyType.FORWARD);
+			toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 			
 			// create an input node for combine with same DOP as input node
 			ReduceNode combinerNode = ((ReduceNode) node).getCombinerUtilityNode();
 			combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
 
-			SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine ("+node.getPactContract().getName()+")", toCombiner, DriverStrategy.SORTED_PARTIAL_REDUCE, this.keyList);
+			SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode,
+								"Combine ("+node.getPactContract().getName()+")", toCombiner,
+								DriverStrategy.SORTED_PARTIAL_REDUCE, this.keyList);
+
 			combiner.setCosts(new Costs(0, 0));
 			combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties());
 			
 			Channel toReducer = new Channel(combiner);
-			toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(), in.getShipStrategySortOrder());
+			toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(),
+										in.getShipStrategySortOrder(), in.getDataExchangeMode());
 			toReducer.setLocalStrategy(LocalStrategy.SORT, in.getLocalStrategyKeys(), in.getLocalStrategySortOrder());
-			return new SingleInputPlanNode(node, "Reduce("+node.getPactContract().getName()+")", toReducer, DriverStrategy.SORTED_REDUCE, this.keyList);
+
+			return new SingleInputPlanNode(node, "Reduce("+node.getPactContract().getName()+")", toReducer,
+											DriverStrategy.SORTED_REDUCE, this.keyList);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1c50d87c/flink-compiler/src/main/java/org/apache/flink/compiler/plan/Channel.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/Channel.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/Channel.java
index e159481..3903c84 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/Channel.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/Channel.java
@@ -32,6 +32,7 @@ import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties;
 import org.apache.flink.compiler.dataproperties.RequestedLocalProperties;
 import org.apache.flink.compiler.plandump.DumpableConnection;
 import org.apache.flink.compiler.util.Utils;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
 
@@ -43,8 +44,10 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 	private PlanNode source;
 	
 	private PlanNode target;
-	
+
 	private ShipStrategyType shipStrategy = ShipStrategyType.NONE;
+
+	private DataExchangeMode dataExchangeMode;
 	
 	private LocalStrategy localStrategy = LocalStrategy.NONE;
 	
@@ -78,8 +81,6 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 	
 	private double relativeTempMemory;
 	
-	private double relativeMemoryGlobalStrategy;
-	
 	private double relativeMemoryLocalStrategy;
 	
 	private int replicationFactor = 1;
@@ -125,33 +126,46 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 	public PlanNode getTarget() {
 		return this.target;
 	}
-	
-	public void setShipStrategy(ShipStrategyType strategy) {
-		setShipStrategy(strategy, null, null, null);
+
+	public void setShipStrategy(ShipStrategyType strategy, DataExchangeMode dataExchangeMode) {
+		setShipStrategy(strategy, null, null, null, dataExchangeMode);
 	}
 	
-	public void setShipStrategy(ShipStrategyType strategy, FieldList keys) {
-		setShipStrategy(strategy, keys, null, null);
+	public void setShipStrategy(ShipStrategyType strategy, FieldList keys, DataExchangeMode dataExchangeMode) {
+		setShipStrategy(strategy, keys, null, null, dataExchangeMode);
 	}
 	
-	public void setShipStrategy(ShipStrategyType strategy, FieldList keys, boolean[] sortDirection) {
-		setShipStrategy(strategy, keys, sortDirection, null);
+	public void setShipStrategy(ShipStrategyType strategy, FieldList keys,
+								boolean[] sortDirection, DataExchangeMode dataExchangeMode) {
+		setShipStrategy(strategy, keys, sortDirection, null, dataExchangeMode);
 	}
 	
-	public void setShipStrategy(ShipStrategyType strategy, FieldList keys, Partitioner<?> partitioner) {
-		setShipStrategy(strategy, keys, null, partitioner);
+	public void setShipStrategy(ShipStrategyType strategy, FieldList keys,
+								Partitioner<?> partitioner, DataExchangeMode dataExchangeMode) {
+		setShipStrategy(strategy, keys, null, partitioner, dataExchangeMode);
 	}
 	
-	public void setShipStrategy(ShipStrategyType strategy, FieldList keys, boolean[] sortDirection, Partitioner<?> partitioner) {
+	public void setShipStrategy(ShipStrategyType strategy, FieldList keys,
+								boolean[] sortDirection, Partitioner<?> partitioner,
+								DataExchangeMode dataExchangeMode) {
 		this.shipStrategy = strategy;
 		this.shipKeys = keys;
 		this.shipSortOrder = sortDirection;
 		this.partitioner = partitioner;
-		
+		this.dataExchangeMode = dataExchangeMode;
 		this.globalProps = null;		// reset the global properties
 	}
-	
-	
+
+	/**
+	 * Gets the data exchange mode (batch / streaming) to use for the data
+	 * exchange of this channel.
+	 *
+	 * @return The data exchange mode of this channel.
+	 */
+	public DataExchangeMode getDataExchangeMode() {
+		return dataExchangeMode;
+	}
+
 	public ShipStrategyType getShipStrategy() {
 		return this.shipStrategy;
 	}
@@ -168,10 +182,6 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 		setLocalStrategy(strategy, null, null);
 	}
 	
-	public void setLocalStrategy(LocalStrategy strategy, FieldList keys) {
-		setLocalStrategy(strategy, keys, null);
-	}
-	
 	public void setLocalStrategy(LocalStrategy strategy, FieldList keys, boolean[] sortDirection) {
 		this.localStrategy = strategy;
 		this.localKeys = keys;
@@ -307,14 +317,6 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 		this.localStrategyComparator = localStrategyComparator;
 	}
 	
-	public double getRelativeMemoryGlobalStrategy() {
-		return relativeMemoryGlobalStrategy;
-	}
-	
-	public void setRelativeMemoryGlobalStrategy(double relativeMemoryGlobalStrategy) {
-		this.relativeMemoryGlobalStrategy = relativeMemoryGlobalStrategy;
-	}
-	
 	public double getRelativeMemoryLocalStrategy() {
 		return relativeMemoryLocalStrategy;
 	}
@@ -477,8 +479,6 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 	
 	/**
 	 * Utility method used while swapping binary union nodes for n-ary union nodes.
-	 * 
-	 * @param newUnionNode
 	 */
 	public void swapUnionNodes(PlanNode newUnionNode) {
 		if (!(this.source instanceof BinaryUnionPlanNode)) {
@@ -493,16 +493,17 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 	public int getMaxDepth() {
 		return this.source.getOptimizerNode().getMaxDepth() + 1;
 	}
-	// --------------------------------------------------------------------------------------------
 
-	
+	// --------------------------------------------------------------------------------------------
 
+	@Override
 	public String toString() {
 		return "Channel (" + this.source + (this.target == null ? ')' : ") -> (" + this.target + ')') +
 				'[' + this.shipStrategy + "] [" + this.localStrategy + "] " +
 				(this.tempMode == null || this.tempMode == TempMode.NONE ? "{NO-TEMP}" : this.tempMode);
 	}
-	
+
+	@Override
 	public Channel clone() {
 		try {
 			return (Channel) super.clone();

http://git-wip-us.apache.org/repos/asf/flink/blob/1c50d87c/flink-compiler/src/main/java/org/apache/flink/compiler/plan/OptimizedPlan.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/OptimizedPlan.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/OptimizedPlan.java
index 00eb287..51d65e1 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/OptimizedPlan.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/OptimizedPlan.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.plan;
 
 import java.util.Collection;
@@ -26,64 +25,46 @@ import org.apache.flink.util.Visitable;
 import org.apache.flink.util.Visitor;
 
 /**
- * The optimizer representation of a plan. The optimizer creates this from the user defined PACT job plan.
- * It works on this representation during its optimization. Finally, this plan is translated to a schedule
- * for the nephele runtime system.
+ * The execution plan generated by the Optimizer. It contains {@link PlanNode}s
+ * and {@link Channel}s that describe exactly how the program should be executed.
+ * It defines all ship strategies (local pipe, shuffle, broadcast, rebalance), all
+ * operator strategies (sorting-merge join, hash join, sorted grouping, ...),
+ * and the data exchange modes (batched, pipelined).
  */
 public class OptimizedPlan implements FlinkPlan, Visitable<PlanNode>  {
 	
-	/**
-	 * The data sources in the plan.
-	 */
+	/** The data sources in the plan. */
 	private final Collection<SourcePlanNode> dataSources;
 
-	/**
-	 * The data sinks in the plan.
-	 */
+	/** The data sinks in the plan. */
 	private final Collection<SinkPlanNode> dataSinks;
 
-	/**
-	 * All nodes in the optimizer plan.
-	 */
+	/** All nodes in the optimizer plan. */
 	private final Collection<PlanNode> allNodes;
 	
-	/**
-	 * The original pact plan.
-	 */
-	private final Plan pactPlan;
+	/** The original program. */
+	private final Plan originalProgram;
 
-	/**
-	 * Name of the PACT job
-	 */
+	/** Name of the job */
 	private final String jobName;
 
 	/**
-	 * The name of the instance type that is to be used.
-	 */
-	private String instanceTypeName;
-	
-	
-	/**
 	 * Creates a new instance of this optimizer plan container. The plan is given and fully
 	 * described by the data sources, sinks and the collection of all nodes.
 	 * 
-	 * @param sources
-	 *        The nodes describing the data sources.
-	 * @param sinks
-	 *        The nodes describing the data sinks.
-	 * @param allNodes
-	 *        A collection containing all nodes in the plan.
-	 * @param jobName
-	 *        The name of the PACT job
+	 * @param sources The data sources.
+	 * @param sinks The data sinks.
+	 * @param allNodes A collection containing all nodes in the plan.
+	 * @param jobName The name of the program
 	 */
 	public OptimizedPlan(Collection<SourcePlanNode> sources, Collection<SinkPlanNode> sinks,
-			Collection<PlanNode> allNodes, String jobName, Plan pactPlan)
+			Collection<PlanNode> allNodes, String jobName, Plan programPlan)
 	{
 		this.dataSources = sources;
 		this.dataSinks = sinks;
 		this.allNodes = allNodes;
 		this.jobName = jobName;
-		this.pactPlan = pactPlan;
+		this.originalProgram = programPlan;
 	}
 
 	/**
@@ -114,46 +95,27 @@ public class OptimizedPlan implements FlinkPlan, Visitable<PlanNode>  {
 	}
 
 	/**
-	 * Returns the name of the optimized PACT job.
+	 * Returns the name of the program.
 	 * 
-	 * @return The name of the optimized PACT job.
+	 * @return The name of the program.
 	 */
 	public String getJobName() {
 		return this.jobName;
 	}
 	
 	/**
-	 * Gets the original pact plan from which this optimized plan was created.
+	 * Gets the original program plan from which this optimized plan was created.
 	 * 
-	 * @return The original pact plan.
+	 * @return The original program plan.
 	 */
 	public Plan getOriginalPactPlan() {
-		return this.pactPlan;
-	}
-
-	/**
-	 * Gets the name of the instance type that should be used for this PACT job.
-	 * 
-	 * @return The instance-type name.
-	 */
-	public String getInstanceTypeName() {
-		return instanceTypeName;
-	}
-
-	/**
-	 * Sets the name of the instance type that should be used for this PACT job.
-	 * 
-	 * @param instanceTypeName
-	 *        The name of the instance type.
-	 */
-	public void setInstanceTypeName(String instanceTypeName) {
-		this.instanceTypeName = instanceTypeName;
+		return this.originalProgram;
 	}
 
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Takes the given visitor and applies it top down to all nodes, starting at the sinks.
+	 * Applies the given visitor top down to all nodes, starting at the sinks.
 	 * 
 	 * @param visitor
 	 *        The visitor to apply to the nodes in this plan.
@@ -165,5 +127,4 @@ public class OptimizedPlan implements FlinkPlan, Visitable<PlanNode>  {
 			node.accept(visitor);
 		}
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1c50d87c/flink-compiler/src/test/java/org/apache/flink/compiler/AdditionalOperatorsTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/AdditionalOperatorsTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/AdditionalOperatorsTest.java
index cf32126..07fc972 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/AdditionalOperatorsTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/AdditionalOperatorsTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler;
 
 import static org.junit.Assert.assertEquals;
@@ -28,7 +27,6 @@ import org.apache.flink.api.java.record.operators.CrossWithLargeOperator;
 import org.apache.flink.api.java.record.operators.CrossWithSmallOperator;
 import org.apache.flink.api.java.record.operators.FileDataSink;
 import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.plan.Channel;
 import org.apache.flink.compiler.plan.DualInputPlanNode;
 import org.apache.flink.compiler.plan.OptimizedPlan;
@@ -38,7 +36,6 @@ import org.apache.flink.compiler.util.DummyOutputFormat;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
 
-
 /**
 * Tests that validate optimizer choices when using operators that are requesting certain specific execution
 * strategies.

http://git-wip-us.apache.org/repos/asf/flink/blob/1c50d87c/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java b/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java
index c6a9b55..beea0b9 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/CompilerTestBase.java
@@ -20,19 +20,16 @@ package org.apache.flink.compiler;
 
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
 import org.apache.flink.api.common.operators.GenericDataSourceBase;
 import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.DeltaIteration;
+import org.apache.flink.api.common.operators.base.BulkIterationBase;
 import org.apache.flink.compiler.costs.DefaultCostEstimator;
 import org.apache.flink.compiler.plan.OptimizedPlan;
 import org.apache.flink.compiler.plan.PlanNode;
@@ -42,9 +39,10 @@ import org.apache.flink.util.Visitor;
 import org.junit.Before;
 
 /**
- *
+ * Base class for Optimizer tests. Offers utility methods to trigger optimization
+ * of a program and to fetch the nodes in an optimizer plan that correspond
+ * the the node in the program plan.
  */
-@SuppressWarnings("deprecation")
 public abstract class CompilerTestBase implements java.io.Serializable {
 	
 	private static final long serialVersionUID = 1L;
@@ -55,8 +53,6 @@ public abstract class CompilerTestBase implements java.io.Serializable {
 	
 	protected static final int DEFAULT_PARALLELISM = 8;
 	
-	protected static final String DEFAULT_PARALLELISM_STRING = String.valueOf(DEFAULT_PARALLELISM);
-	
 	private static final String CACHE_KEY = "cachekey";
 	
 	// ------------------------------------------------------------------------
@@ -102,16 +98,11 @@ public abstract class CompilerTestBase implements java.io.Serializable {
 	}
 	
 	// ------------------------------------------------------------------------
-	public static OperatorResolver getContractResolver(Plan plan) {
-		return new OperatorResolver(plan);
-	}
 	
 	public static OptimizerPlanNodeResolver getOptimizerPlanNodeResolver(OptimizedPlan plan) {
 		return new OptimizerPlanNodeResolver(plan);
 	}
 	
-	// ------------------------------------------------------------------------
-	
 	public static final class OptimizerPlanNodeResolver {
 		
 		private final Map<String, ArrayList<PlanNode>> map;
@@ -205,97 +196,6 @@ public abstract class CompilerTestBase implements java.io.Serializable {
 			}
 		}
 	}
-	
-	// ------------------------------------------------------------------------
-	
-	public static final class OperatorResolver implements Visitor<Operator<?>> {
-		
-		private final Map<String, List<Operator<?>>> map;
-		private Set<Operator<?>> seen;
-		
-		OperatorResolver(Plan p) {
-			this.map = new HashMap<String, List<Operator<?>>>();
-			this.seen = new HashSet<Operator<?>>();
-			
-			p.accept(this);
-			this.seen = null;
-		}
-		
-		
-		@SuppressWarnings("unchecked")
-		public <T extends Operator<?>> T getNode(String name) {
-			List<Operator<?>> nodes = this.map.get(name);
-			if (nodes == null || nodes.isEmpty()) {
-				throw new RuntimeException("No nodes found with the given name.");
-			} else if (nodes.size() != 1) {
-				throw new RuntimeException("Multiple nodes found with the given name.");
-			} else {
-				return (T) nodes.get(0);
-			}
-		}
-		
-		@SuppressWarnings("unchecked")
-		public <T extends Operator<?>> T getNode(String name, Class<? extends Function> stubClass) {
-			List<Operator<?>> nodes = this.map.get(name);
-			if (nodes == null || nodes.isEmpty()) {
-				throw new RuntimeException("No node found with the given name and stub class.");
-			} else {
-				Operator<?> found = null;
-				for (Operator<?> node : nodes) {
-					if (node.getClass() == stubClass) {
-						if (found == null) {
-							found = node;
-						} else {
-							throw new RuntimeException("Multiple nodes found with the given name and stub class.");
-						}
-					}
-				}
-				if (found == null) {
-					throw new RuntimeException("No node found with the given name and stub class.");
-				} else {
-					return (T) found;
-				}
-			}
-		}
-		
-		public List<Operator<?>> getNodes(String name) {
-			List<Operator<?>> nodes = this.map.get(name);
-			if (nodes == null || nodes.isEmpty()) {
-				throw new RuntimeException("No node found with the given name.");
-			} else {
-				return new ArrayList<Operator<?>>(nodes);
-			}
-		}
-
-		@Override
-		public boolean preVisit(Operator<?> visitable) {
-			if (this.seen.add(visitable)) {
-				// add to  the map
-				final String name = visitable.getName();
-				List<Operator<?>> list = this.map.get(name);
-				if (list == null) {
-					list = new ArrayList<Operator<?>>(2);
-					this.map.put(name, list);
-				}
-				list.add(visitable);
-				
-				// recurse into bulk iterations
-				if (visitable instanceof BulkIteration) {
-					((BulkIteration) visitable).getNextPartialSolution().accept(this);
-				} else if (visitable instanceof DeltaIteration) {
-					((DeltaIteration) visitable).getSolutionSetDelta().accept(this);
-					((DeltaIteration) visitable).getNextWorkset().accept(this);
-				}
-				
-				return true;
-			} else {
-				return false;
-			}
-		}
-
-		@Override
-		public void postVisit(Operator<?> visitable) {}
-	}
 
 	/**
 	 * Collects all DataSources of a plan to add statistics
@@ -311,8 +211,8 @@ public abstract class CompilerTestBase implements java.io.Serializable {
 			if(visitable instanceof GenericDataSourceBase) {
 				sources.add((GenericDataSourceBase<?, ?>) visitable);
 			}
-			else if(visitable instanceof BulkIteration) {
-				((BulkIteration) visitable).getNextPartialSolution().accept(this);
+			else if(visitable instanceof BulkIterationBase) {
+				((BulkIterationBase<?>) visitable).getNextPartialSolution().accept(this);
 			}
 			
 			return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/1c50d87c/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
index 677d9be..b800279 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
@@ -50,6 +50,7 @@ import org.apache.flink.compiler.plan.PlanNode.FeedbackPropertiesMeetRequirement
 import org.apache.flink.compiler.testfunctions.DummyFlatJoinFunction;
 import org.apache.flink.compiler.testfunctions.IdentityMapper;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
@@ -67,12 +68,12 @@ public class FeedbackPropertiesMatchTest {
 			SourcePlanNode otherTarget = new SourcePlanNode(getSourceNode(), "Source");
 			
 			Channel toMap1 = new Channel(target);
-			toMap1.setShipStrategy(ShipStrategyType.FORWARD);
+			toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 			toMap1.setLocalStrategy(LocalStrategy.NONE);
 			SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
 			
 			Channel toMap2 = new Channel(map1);
-			toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+			toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 			toMap2.setLocalStrategy(LocalStrategy.NONE);
 			SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
 			
@@ -96,12 +97,12 @@ public class FeedbackPropertiesMatchTest {
 			SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Source");
 			
 			Channel toMap1 = new Channel(target);
-			toMap1.setShipStrategy(ShipStrategyType.FORWARD);
+			toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 			toMap1.setLocalStrategy(LocalStrategy.NONE);
 			SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
 			
 			Channel toMap2 = new Channel(map1);
-			toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+			toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 			toMap2.setLocalStrategy(LocalStrategy.NONE);
 			SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
 			
@@ -674,10 +675,10 @@ public class FeedbackPropertiesMatchTest {
 				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
 				reqGp.setAnyPartitioning(new FieldSet(2, 5));
 				
-				toMap1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(2, 5));
+				toMap1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(2, 5), DataExchangeMode.PIPELINED);
 				toMap1.setLocalStrategy(LocalStrategy.NONE);
 				
-				toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+				toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 				toMap2.setLocalStrategy(LocalStrategy.NONE);
 				
 				
@@ -700,10 +701,10 @@ public class FeedbackPropertiesMatchTest {
 				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
 				reqGp.setAnyPartitioning(new FieldSet(2, 5));
 				
-				toMap1.setShipStrategy(ShipStrategyType.FORWARD);
+				toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 				toMap1.setLocalStrategy(LocalStrategy.NONE);
 				
-				toMap2.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(2, 5));
+				toMap2.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(2, 5), DataExchangeMode.PIPELINED);
 				toMap2.setLocalStrategy(LocalStrategy.NONE);
 				
 				
@@ -726,10 +727,10 @@ public class FeedbackPropertiesMatchTest {
 				RequestedLocalProperties reqLp = new RequestedLocalProperties();
 				reqLp.setGroupedFields(new FieldList(4, 1));
 				
-				toMap1.setShipStrategy(ShipStrategyType.FORWARD);
+				toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 				toMap1.setLocalStrategy(LocalStrategy.SORT, new FieldList(5, 7), new boolean[] {false, false});
 				
-				toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+				toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 				toMap2.setLocalStrategy(LocalStrategy.NONE);
 				
 				toMap1.setRequiredGlobalProps(null);
@@ -751,10 +752,10 @@ public class FeedbackPropertiesMatchTest {
 				RequestedLocalProperties reqLp = new RequestedLocalProperties();
 				reqLp.setGroupedFields(new FieldList(4, 1));
 				
-				toMap1.setShipStrategy(ShipStrategyType.FORWARD);
+				toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 				toMap1.setLocalStrategy(LocalStrategy.NONE);
 				
-				toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+				toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 				toMap2.setLocalStrategy(LocalStrategy.SORT, new FieldList(5, 7), new boolean[] {false, false});
 				
 				
@@ -780,10 +781,10 @@ public class FeedbackPropertiesMatchTest {
 				RequestedLocalProperties reqLp = new RequestedLocalProperties();
 				reqLp.setGroupedFields(new FieldList(5, 7));
 				
-				toMap1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(5, 7));
+				toMap1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(5, 7), DataExchangeMode.PIPELINED);
 				toMap1.setLocalStrategy(LocalStrategy.SORT, new FieldList(5, 7), new boolean[] {false, false});
 				
-				toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+				toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 				toMap2.setLocalStrategy(LocalStrategy.NONE);
 				
 				toMap1.setRequiredGlobalProps(reqGp);
@@ -824,13 +825,13 @@ public class FeedbackPropertiesMatchTest {
 				RequestedLocalProperties reqLp = new RequestedLocalProperties();
 				reqLp.setGroupedFields(new FieldList(4, 1));
 				
-				toMap1.setShipStrategy(ShipStrategyType.FORWARD);
+				toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 				toMap1.setLocalStrategy(LocalStrategy.SORT, new FieldList(5, 7), new boolean[] {false, false});
 				
-				toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+				toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 				toMap2.setLocalStrategy(LocalStrategy.NONE);
 				
-				toMap3.setShipStrategy(ShipStrategyType.FORWARD);
+				toMap3.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 				toMap3.setLocalStrategy(LocalStrategy.NONE);
 				
 				toMap1.setRequiredGlobalProps(null);
@@ -855,13 +856,13 @@ public class FeedbackPropertiesMatchTest {
 				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
 				reqGp.setAnyPartitioning(new FieldSet(2, 3));
 				
-				toMap1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(1, 2));
+				toMap1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(1, 2), DataExchangeMode.PIPELINED);
 				toMap1.setLocalStrategy(LocalStrategy.NONE);
 				
-				toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+				toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 				toMap2.setLocalStrategy(LocalStrategy.NONE);
 				
-				toMap3.setShipStrategy(ShipStrategyType.FORWARD);
+				toMap3.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 				toMap3.setLocalStrategy(LocalStrategy.NONE);
 				
 				toMap1.setRequiredGlobalProps(null);
@@ -892,21 +893,21 @@ public class FeedbackPropertiesMatchTest {
 			SourcePlanNode source2 = new SourcePlanNode(getSourceNode(), "Source 2");
 			
 			Channel toMap1 = new Channel(source1);
-			toMap1.setShipStrategy(ShipStrategyType.FORWARD);
+			toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 			toMap1.setLocalStrategy(LocalStrategy.NONE);
 			SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
 			
 			Channel toMap2 = new Channel(source2);
-			toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+			toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 			toMap2.setLocalStrategy(LocalStrategy.NONE);
 			SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
 			
 			Channel toJoin1 = new Channel(map1);
 			Channel toJoin2 = new Channel(map2);
 			
-			toJoin1.setShipStrategy(ShipStrategyType.FORWARD);
+			toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 			toJoin1.setLocalStrategy(LocalStrategy.NONE);
-			toJoin2.setShipStrategy(ShipStrategyType.FORWARD);
+			toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 			toJoin2.setLocalStrategy(LocalStrategy.NONE);
 			
 			DualInputPlanNode join = new DualInputPlanNode(getJoinNode(), "Join", toJoin1, toJoin2, DriverStrategy.HYBRIDHASH_BUILD_FIRST);
@@ -927,12 +928,12 @@ public class FeedbackPropertiesMatchTest {
 			SourcePlanNode source = new SourcePlanNode(getSourceNode(), "Other Source");
 			
 			Channel toMap1 = new Channel(target);
-			toMap1.setShipStrategy(ShipStrategyType.FORWARD);
+			toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 			toMap1.setLocalStrategy(LocalStrategy.NONE);
 			SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
 			
 			Channel toMap2 = new Channel(source);
-			toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+			toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 			toMap2.setLocalStrategy(LocalStrategy.NONE);
 			SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
 			
@@ -942,13 +943,13 @@ public class FeedbackPropertiesMatchTest {
 			DualInputPlanNode join = new DualInputPlanNode(getJoinNode(), "Join", toJoin1, toJoin2, DriverStrategy.HYBRIDHASH_BUILD_FIRST);
 			
 			Channel toAfterJoin = new Channel(join);
-			toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD);
+			toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 			toAfterJoin.setLocalStrategy(LocalStrategy.NONE);
 			SingleInputPlanNode afterJoin = new SingleInputPlanNode(getMapNode(), "After Join Mapper", toAfterJoin, DriverStrategy.MAP);
 			
 			// attach some properties to the non-relevant input
 			{
-				toMap2.setShipStrategy(ShipStrategyType.BROADCAST);
+				toMap2.setShipStrategy(ShipStrategyType.BROADCAST, DataExchangeMode.PIPELINED);
 				toMap2.setLocalStrategy(LocalStrategy.SORT, new FieldList(2, 7), new boolean[] {true, true});
 				
 				RequestedGlobalProperties joinGp = new RequestedGlobalProperties();
@@ -957,7 +958,7 @@ public class FeedbackPropertiesMatchTest {
 				RequestedLocalProperties joinLp = new RequestedLocalProperties();
 				joinLp.setOrdering(new Ordering(2, null, Order.ASCENDING).appendOrdering(7, null, Order.ASCENDING));
 				
-				toJoin2.setShipStrategy(ShipStrategyType.FORWARD);
+				toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 				toJoin2.setLocalStrategy(LocalStrategy.NONE);
 				toJoin2.setRequiredGlobalProps(joinGp);
 				toJoin2.setRequiredLocalProps(joinLp);
@@ -967,7 +968,7 @@ public class FeedbackPropertiesMatchTest {
 			
 			// no properties from the partial solution, no required properties
 			{
-				toJoin1.setShipStrategy(ShipStrategyType.FORWARD);
+				toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 				toJoin1.setLocalStrategy(LocalStrategy.NONE);
 				
 				GlobalProperties gp = new GlobalProperties();
@@ -979,7 +980,7 @@ public class FeedbackPropertiesMatchTest {
 			
 			// some properties from the partial solution, no required properties
 			{
-				toJoin1.setShipStrategy(ShipStrategyType.FORWARD);
+				toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 				toJoin1.setLocalStrategy(LocalStrategy.NONE);
 				
 				GlobalProperties gp = new GlobalProperties();
@@ -1006,7 +1007,7 @@ public class FeedbackPropertiesMatchTest {
 				toJoin1.setRequiredGlobalProps(rgp);
 				toJoin1.setRequiredLocalProps(rlp);
 				
-				toJoin1.setShipStrategy(ShipStrategyType.FORWARD);
+				toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 				toJoin1.setLocalStrategy(LocalStrategy.NONE);
 				
 				FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp);
@@ -1028,7 +1029,7 @@ public class FeedbackPropertiesMatchTest {
 				toJoin1.setRequiredGlobalProps(rgp);
 				toJoin1.setRequiredLocalProps(rlp);
 				
-				toJoin1.setShipStrategy(ShipStrategyType.FORWARD);
+				toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 				toJoin1.setLocalStrategy(LocalStrategy.NONE);
 				
 				FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp);
@@ -1053,7 +1054,7 @@ public class FeedbackPropertiesMatchTest {
 				toJoin1.setRequiredGlobalProps(null);
 				toJoin1.setRequiredLocalProps(null);
 				
-				toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(2, 1));
+				toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(2, 1), DataExchangeMode.PIPELINED);
 				toJoin1.setLocalStrategy(LocalStrategy.SORT, new FieldList(7, 3), new boolean[] {true, false});
 				
 				FeedbackPropertiesMeetRequirementsReport report = join.checkPartialSolutionPropertiesMet(target, gp, lp);
@@ -1075,13 +1076,13 @@ public class FeedbackPropertiesMatchTest {
 				toMap1.setRequiredGlobalProps(null);
 				toMap1.setRequiredLocalProps(null);
 				
-				toJoin1.setShipStrategy(ShipStrategyType.FORWARD);
+				toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 				toJoin1.setLocalStrategy(LocalStrategy.NONE);
 				
 				toJoin1.setRequiredGlobalProps(rgp);
 				toJoin1.setRequiredLocalProps(rlp);
 			
-				toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD);
+				toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 				toAfterJoin.setLocalStrategy(LocalStrategy.NONE);
 				
 				toAfterJoin.setRequiredGlobalProps(rgp);
@@ -1112,7 +1113,7 @@ public class FeedbackPropertiesMatchTest {
 				toJoin1.setRequiredGlobalProps(rgp1);
 				toJoin1.setRequiredLocalProps(rlp1);
 			
-				toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD);
+				toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 				toAfterJoin.setLocalStrategy(LocalStrategy.NONE);
 				
 				toAfterJoin.setRequiredGlobalProps(rgp2);
@@ -1137,7 +1138,7 @@ public class FeedbackPropertiesMatchTest {
 				toJoin1.setRequiredGlobalProps(null);
 				toJoin1.setRequiredLocalProps(null);
 				
-				toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(2, 1));
+				toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(2, 1), DataExchangeMode.PIPELINED);
 				toJoin1.setLocalStrategy(LocalStrategy.SORT, new FieldList(7, 3), new boolean[] {true, false});
 				
 				toAfterJoin.setRequiredGlobalProps(rgp);
@@ -1159,7 +1160,7 @@ public class FeedbackPropertiesMatchTest {
 				toJoin1.setRequiredGlobalProps(null);
 				toJoin1.setRequiredLocalProps(null);
 				
-				toJoin1.setShipStrategy(ShipStrategyType.FORWARD);
+				toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 				toJoin1.setLocalStrategy(LocalStrategy.SORT, new FieldList(7, 3), new boolean[] {true, false});
 				
 				toAfterJoin.setRequiredGlobalProps(null);
@@ -1184,7 +1185,7 @@ public class FeedbackPropertiesMatchTest {
 				toJoin1.setRequiredGlobalProps(null);
 				toJoin1.setRequiredLocalProps(null);
 				
-				toJoin1.setShipStrategy(ShipStrategyType.FORWARD);
+				toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 				toJoin1.setLocalStrategy(LocalStrategy.SORT, new FieldList(7, 3), new boolean[] {true, false});
 				
 				toAfterJoin.setRequiredGlobalProps(rgp);
@@ -1206,27 +1207,27 @@ public class FeedbackPropertiesMatchTest {
 			SourcePlanNode target = new SourcePlanNode(getSourceNode(), "Partial Solution");
 			
 			Channel toMap1 = new Channel(target);
-			toMap1.setShipStrategy(ShipStrategyType.FORWARD);
+			toMap1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 			toMap1.setLocalStrategy(LocalStrategy.NONE);
 			SingleInputPlanNode map1 = new SingleInputPlanNode(getMapNode(), "Mapper 1", toMap1, DriverStrategy.MAP);
 			
 			Channel toMap2 = new Channel(target);
-			toMap2.setShipStrategy(ShipStrategyType.FORWARD);
+			toMap2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 			toMap2.setLocalStrategy(LocalStrategy.NONE);
 			SingleInputPlanNode map2 = new SingleInputPlanNode(getMapNode(), "Mapper 2", toMap2, DriverStrategy.MAP);
 			
 			Channel toJoin1 = new Channel(map1);
-			toJoin1.setShipStrategy(ShipStrategyType.FORWARD);
+			toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 			toJoin1.setLocalStrategy(LocalStrategy.NONE);
 			
 			Channel toJoin2 = new Channel(map2);
-			toJoin2.setShipStrategy(ShipStrategyType.FORWARD);
+			toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 			toJoin2.setLocalStrategy(LocalStrategy.NONE);
 			
 			DualInputPlanNode join = new DualInputPlanNode(getJoinNode(), "Join", toJoin1, toJoin2, DriverStrategy.HYBRIDHASH_BUILD_FIRST);
 			
 			Channel toAfterJoin = new Channel(join);
-			toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD);
+			toAfterJoin.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 			toAfterJoin.setLocalStrategy(LocalStrategy.NONE);
 			SingleInputPlanNode afterJoin = new SingleInputPlanNode(getMapNode(), "After Join Mapper", toAfterJoin, DriverStrategy.MAP);
 			
@@ -1336,8 +1337,8 @@ public class FeedbackPropertiesMatchTest {
 				toJoin2.setRequiredGlobalProps(null);
 				toJoin2.setRequiredLocalProps(null);
 				
-				toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(88));
-				toJoin2.setShipStrategy(ShipStrategyType.BROADCAST);
+				toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(88), DataExchangeMode.PIPELINED);
+				toJoin2.setShipStrategy(ShipStrategyType.BROADCAST, DataExchangeMode.PIPELINED);
 				
 				toAfterJoin.setRequiredGlobalProps(rgp);
 				toAfterJoin.setRequiredLocalProps(rlp);
@@ -1358,8 +1359,8 @@ public class FeedbackPropertiesMatchTest {
 				RequestedLocalProperties rlp = new RequestedLocalProperties();
 				rlp.setGroupedFields(new FieldList(2, 1));
 				
-				toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(88));
-				toJoin2.setShipStrategy(ShipStrategyType.FORWARD);
+				toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(88), DataExchangeMode.PIPELINED);
+				toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 				
 				toAfterJoin.setRequiredGlobalProps(rgp);
 				toAfterJoin.setRequiredLocalProps(rlp);
@@ -1380,8 +1381,8 @@ public class FeedbackPropertiesMatchTest {
 				RequestedLocalProperties rlp = new RequestedLocalProperties();
 				rlp.setGroupedFields(new FieldList(77, 69));
 				
-				toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(88));
-				toJoin2.setShipStrategy(ShipStrategyType.FORWARD);
+				toJoin1.setShipStrategy(ShipStrategyType.PARTITION_HASH, new FieldList(88), DataExchangeMode.PIPELINED);
+				toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 				
 				toAfterJoin.setRequiredGlobalProps(rgp);
 				toAfterJoin.setRequiredLocalProps(rlp);
@@ -1400,10 +1401,10 @@ public class FeedbackPropertiesMatchTest {
 				rgp.setHashPartitioned(new FieldList(3));
 				
 				
-				toJoin1.setShipStrategy(ShipStrategyType.FORWARD);
+				toJoin1.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 				toJoin1.setLocalStrategy(LocalStrategy.SORT, new FieldList(3), new boolean[] { false });
 				
-				toJoin2.setShipStrategy(ShipStrategyType.FORWARD);
+				toJoin2.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
 				toJoin1.setLocalStrategy(LocalStrategy.NONE);
 				
 				toAfterJoin.setRequiredGlobalProps(rgp);
@@ -1421,15 +1422,15 @@ public class FeedbackPropertiesMatchTest {
 	
 	// --------------------------------------------------------------------------------------------
 	
-	private static final DataSourceNode getSourceNode() {
+	private static DataSourceNode getSourceNode() {
 		return new DataSourceNode(new GenericDataSourceBase<String, TextInputFormat>(new TextInputFormat(new Path("/")), new OperatorInformation<String>(BasicTypeInfo.STRING_TYPE_INFO)));
 	}
 	
-	private static final MapNode getMapNode() {
+	private static MapNode getMapNode() {
 		return new MapNode(new MapOperatorBase<String, String, MapFunction<String,String>>(new IdentityMapper<String>(), new UnaryOperatorInformation<String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), "map op"));
 	}
 	
-	private static final JoinNode getJoinNode() {
+	private static JoinNode getJoinNode() {
 		return new JoinNode(new JoinOperatorBase<String, String, String, FlatJoinFunction<String, String, String>>(new DummyFlatJoinFunction<String>(), new BinaryOperatorInformation<String, String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), new int[] {1}, new int[] {2}, "join op"));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1c50d87c/flink-compiler/src/test/java/org/apache/flink/compiler/costs/DefaultCostEstimatorTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/costs/DefaultCostEstimatorTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/costs/DefaultCostEstimatorTest.java
index 3c28a3a..01404ac 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/costs/DefaultCostEstimatorTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/costs/DefaultCostEstimatorTest.java
@@ -21,9 +21,6 @@ package org.apache.flink.compiler.costs;
 
 import static org.junit.Assert.*;
 
-import org.apache.flink.compiler.costs.CostEstimator;
-import org.apache.flink.compiler.costs.Costs;
-import org.apache.flink.compiler.costs.DefaultCostEstimator;
 import org.apache.flink.compiler.dag.EstimateProvider;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1c50d87c/flink-compiler/src/test/java/org/apache/flink/compiler/dataexchange/DataExchangeModeClosedBranchingTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataexchange/DataExchangeModeClosedBranchingTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataexchange/DataExchangeModeClosedBranchingTest.java
new file mode 100644
index 0000000..a78336c
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataexchange/DataExchangeModeClosedBranchingTest.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.dataexchange;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.compiler.CompilerTestBase;
+import org.apache.flink.compiler.plan.DualInputPlanNode;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.compiler.testfunctions.DummyCoGroupFunction;
+import org.apache.flink.compiler.testfunctions.DummyFlatJoinFunction;
+import org.apache.flink.compiler.testfunctions.IdentityFlatMapper;
+import org.apache.flink.compiler.testfunctions.SelectOneReducer;
+import org.apache.flink.compiler.testfunctions.Top1GroupReducer;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * This test checks the correct assignment of the DataExchangeMode to
+ * connections for programs that branch, and re-join those branches.
+ *
+ * <pre>
+ *                                         /-> (sink)
+ *                                        /
+ *                         /-> (reduce) -+          /-> (flatmap) -> (sink)
+ *                        /               \        /
+ *     (source) -> (map) -                (join) -+-----\
+ *                        \               /              \
+ *                         \-> (filter) -+                \
+ *                                       \                (co group) -> (sink)
+ *                                        \                /
+ *                                         \-> (reduce) - /
+ * </pre>
+ */
+@SuppressWarnings("serial")
+public class DataExchangeModeClosedBranchingTest extends CompilerTestBase {
+
+	@Test
+	public void testPipelinedForced() {
+		// PIPELINED_FORCED should result in pipelining all the way
+		verifyBranchingJoiningPlan(ExecutionMode.PIPELINED_FORCED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED,
+				DataExchangeMode.PIPELINED, DataExchangeMode.PIPELINED);
+	}
+
+	@Test
+	public void testPipelined() {
+		// PIPELINED should result in pipelining all the way
+		verifyBranchingJoiningPlan(ExecutionMode.PIPELINED,
+				DataExchangeMode.PIPELINED,   // to map
+				DataExchangeMode.PIPELINED,   // to combiner connections are pipelined
+				DataExchangeMode.BATCH,       // to reduce
+				DataExchangeMode.BATCH,       // to filter
+				DataExchangeMode.PIPELINED,   // to sink after reduce
+				DataExchangeMode.PIPELINED,   // to join (first input)
+				DataExchangeMode.BATCH,       // to join (second input)
+				DataExchangeMode.PIPELINED,   // combiner connections are pipelined
+				DataExchangeMode.BATCH,       // to other reducer
+				DataExchangeMode.PIPELINED,   // to flatMap
+				DataExchangeMode.PIPELINED,   // to sink after flatMap
+				DataExchangeMode.PIPELINED,   // to coGroup (first input)
+				DataExchangeMode.PIPELINED,   // to coGroup (second input)
+				DataExchangeMode.PIPELINED    // to sink after coGroup
+		);
+	}
+
+	@Test
+	public void testBatch() {
+		// BATCH should result in batching the shuffle all the way
+		verifyBranchingJoiningPlan(ExecutionMode.BATCH,
+				DataExchangeMode.PIPELINED,   // to map
+				DataExchangeMode.PIPELINED,   // to combiner connections are pipelined
+				DataExchangeMode.BATCH,       // to reduce
+				DataExchangeMode.BATCH,       // to filter
+				DataExchangeMode.PIPELINED,   // to sink after reduce
+				DataExchangeMode.BATCH,       // to join (first input)
+				DataExchangeMode.BATCH,       // to join (second input)
+				DataExchangeMode.PIPELINED,   // combiner connections are pipelined
+				DataExchangeMode.BATCH,       // to other reducer
+				DataExchangeMode.PIPELINED,   // to flatMap
+				DataExchangeMode.PIPELINED,   // to sink after flatMap
+				DataExchangeMode.BATCH,       // to coGroup (first input)
+				DataExchangeMode.BATCH,       // to coGroup (second input)
+				DataExchangeMode.PIPELINED    // to sink after coGroup
+		);
+	}
+
+	@Test
+	public void testBatchForced() {
+		// BATCH_FORCED should result in batching all the way
+		verifyBranchingJoiningPlan(ExecutionMode.BATCH_FORCED,
+				DataExchangeMode.BATCH,       // to map
+				DataExchangeMode.PIPELINED,   // to combiner connections are pipelined
+				DataExchangeMode.BATCH,       // to reduce
+				DataExchangeMode.BATCH,       // to filter
+				DataExchangeMode.BATCH,       // to sink after reduce
+				DataExchangeMode.BATCH,       // to join (first input)
+				DataExchangeMode.BATCH,       // to join (second input)
+				DataExchangeMode.PIPELINED,   // combiner connections are pipelined
+				DataExchangeMode.BATCH,       // to other reducer
+				DataExchangeMode.BATCH,       // to flatMap
+				DataExchangeMode.BATCH,       // to sink after flatMap
+				DataExchangeMode.BATCH,       // to coGroup (first input)
+				DataExchangeMode.BATCH,       // to coGroup (second input)
+				DataExchangeMode.BATCH        // to sink after coGroup
+		);
+	}
+
+	private void verifyBranchingJoiningPlan(ExecutionMode execMode,
+											DataExchangeMode toMap,
+											DataExchangeMode toReduceCombiner,
+											DataExchangeMode toReduce,
+											DataExchangeMode toFilter,
+											DataExchangeMode toReduceSink,
+											DataExchangeMode toJoin1,
+											DataExchangeMode toJoin2,
+											DataExchangeMode toOtherReduceCombiner,
+											DataExchangeMode toOtherReduce,
+											DataExchangeMode toFlatMap,
+											DataExchangeMode toFlatMapSink,
+											DataExchangeMode toCoGroup1,
+											DataExchangeMode toCoGroup2,
+											DataExchangeMode toCoGroupSink)
+	{
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.getConfig().setExecutionMode(execMode);
+
+			DataSet<Tuple2<Long, Long>> data = env.fromElements(33L, 44L)
+					.map(new MapFunction<Long, Tuple2<Long, Long>>() {
+						@Override
+						public Tuple2<Long, Long> map(Long value) {
+							return new Tuple2<Long, Long>(value, value);
+						}
+					});
+
+			DataSet<Tuple2<Long, Long>> reduced = data.groupBy(0).reduce(new SelectOneReducer<Tuple2<Long, Long>>());
+			reduced.output(new DiscardingOutputFormat<Tuple2<Long, Long>>()).name("reduceSink");
+
+			DataSet<Tuple2<Long, Long>> filtered = data.filter(new FilterFunction<Tuple2<Long, Long>>() {
+				@Override
+				public boolean filter(Tuple2<Long, Long> value) throws Exception {
+					return false;
+				}
+			});
+
+			DataSet<Tuple2<Long, Long>> joined = reduced.join(filtered)
+					.where(1).equalTo(1)
+					.with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
+
+			joined.flatMap(new IdentityFlatMapper<Tuple2<Long, Long>>())
+					.output(new DiscardingOutputFormat<Tuple2<Long, Long>>()).name("flatMapSink");
+
+			joined.coGroup(filtered.groupBy(1).reduceGroup(new Top1GroupReducer<Tuple2<Long, Long>>()))
+					.where(0).equalTo(0)
+					.with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>())
+					.output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>()).name("cgSink");
+
+
+			OptimizedPlan optPlan = compileNoStats(env.createProgramPlan());
+
+			SinkPlanNode reduceSink = findSink(optPlan.getDataSinks(), "reduceSink");
+			SinkPlanNode flatMapSink = findSink(optPlan.getDataSinks(), "flatMapSink");
+			SinkPlanNode cgSink = findSink(optPlan.getDataSinks(), "cgSink");
+
+			DualInputPlanNode coGroupNode = (DualInputPlanNode) cgSink.getPredecessor();
+
+			DualInputPlanNode joinNode = (DualInputPlanNode) coGroupNode.getInput1().getSource();
+			SingleInputPlanNode otherReduceNode = (SingleInputPlanNode) coGroupNode.getInput2().getSource();
+			SingleInputPlanNode otherReduceCombinerNode = (SingleInputPlanNode) otherReduceNode.getPredecessor();
+
+			SingleInputPlanNode reduceNode = (SingleInputPlanNode) joinNode.getInput1().getSource();
+			SingleInputPlanNode reduceCombinerNode = (SingleInputPlanNode) reduceNode.getPredecessor();
+			assertEquals(reduceNode, reduceSink.getPredecessor());
+
+			SingleInputPlanNode filterNode = (SingleInputPlanNode) joinNode.getInput2().getSource();
+			assertEquals(filterNode, otherReduceCombinerNode.getPredecessor());
+
+			SingleInputPlanNode mapNode = (SingleInputPlanNode) filterNode.getPredecessor();
+			assertEquals(mapNode, reduceCombinerNode.getPredecessor());
+
+			SingleInputPlanNode flatMapNode = (SingleInputPlanNode) flatMapSink.getPredecessor();
+			assertEquals(joinNode, flatMapNode.getPredecessor());
+
+			// verify the data exchange modes
+
+			assertEquals(toReduceSink, reduceSink.getInput().getDataExchangeMode());
+			assertEquals(toFlatMapSink, flatMapSink.getInput().getDataExchangeMode());
+			assertEquals(toCoGroupSink, cgSink.getInput().getDataExchangeMode());
+
+			assertEquals(toCoGroup1, coGroupNode.getInput1().getDataExchangeMode());
+			assertEquals(toCoGroup2, coGroupNode.getInput2().getDataExchangeMode());
+
+			assertEquals(toJoin1, joinNode.getInput1().getDataExchangeMode());
+			assertEquals(toJoin2, joinNode.getInput2().getDataExchangeMode());
+
+			assertEquals(toOtherReduce, otherReduceNode.getInput().getDataExchangeMode());
+			assertEquals(toOtherReduceCombiner, otherReduceCombinerNode.getInput().getDataExchangeMode());
+
+			assertEquals(toFlatMap, flatMapNode.getInput().getDataExchangeMode());
+
+			assertEquals(toFilter, filterNode.getInput().getDataExchangeMode());
+			assertEquals(toReduce, reduceNode.getInput().getDataExchangeMode());
+			assertEquals(toReduceCombiner, reduceCombinerNode.getInput().getDataExchangeMode());
+
+			assertEquals(toMap, mapNode.getInput().getDataExchangeMode());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	private SinkPlanNode findSink(Collection<SinkPlanNode> collection, String name) {
+		for (SinkPlanNode node : collection) {
+			String nodeName = node.getOptimizerNode().getPactContract().getName();
+			if (nodeName != null && nodeName.equals(name)) {
+				return node;
+			}
+		}
+
+		throw new IllegalArgumentException("No node with that name was found.");
+	}
+}


Mime
View raw message