flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [4/4] flink git commit: [FLINK-1443] Added support for replicated data sources. Introduced new PartitioningProperty for any distribution (random partitioning or full replication).
Date Thu, 05 Feb 2015 10:31:23 GMT
[FLINK-1443] Added support for replicated data sources.
Introduced new PartitioningProperty for any distribution (random partitioning or full replication).


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a19b4a02
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a19b4a02
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a19b4a02

Branch: refs/heads/master
Commit: a19b4a02bfa5237e0dcd2b264da36229546f23c0
Parents: 7452802
Author: Fabian Hueske <fhueske@apache.org>
Authored: Tue Feb 3 16:03:07 2015 +0100
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Thu Feb 5 11:18:04 2015 +0100

----------------------------------------------------------------------
 .../flink/compiler/dag/DataSourceNode.java      |  41 +-
 .../flink/compiler/dag/SingleInputNode.java     |  24 +-
 .../apache/flink/compiler/dag/TwoInputNode.java |  33 ++
 .../dataproperties/GlobalProperties.java        |  17 +-
 .../dataproperties/PartitioningProperty.java    |  11 +-
 .../RequestedGlobalProperties.java              |  35 +-
 .../operators/AbstractJoinDescriptor.java       |   2 +-
 .../operators/AllGroupReduceProperties.java     |   2 +-
 .../AllGroupWithPartialPreGroupProperties.java  |   2 +-
 .../operators/CartesianProductDescriptor.java   |   2 +-
 .../compiler/operators/CoGroupDescriptor.java   |   2 +-
 .../operators/CollectorMapDescriptor.java       |   6 +-
 .../compiler/operators/FilterDescriptor.java    |   4 +-
 .../compiler/operators/FlatMapDescriptor.java   |   6 +-
 .../operators/GroupReduceProperties.java        |   2 +-
 .../GroupReduceWithCombineProperties.java       |   2 +-
 .../flink/compiler/operators/MapDescriptor.java |   4 +-
 .../operators/MapPartitionDescriptor.java       |   4 +-
 .../operators/OperatorDescriptorSingle.java     |   8 +-
 .../operators/PartialGroupProperties.java       |   2 +-
 .../compiler/operators/ReduceProperties.java    |   2 +-
 .../apache/flink/compiler/DOPChangeTest.java    |  10 +-
 .../compiler/ReplicatingDataSourceTest.java     | 495 +++++++++++++++++++
 .../GlobalPropertiesFilteringTest.java          |  12 +-
 .../RequestedGlobalPropertiesFilteringTest.java |   2 +-
 .../api/common/io/ReplicatingInputFormat.java   | 115 +++++
 .../io/ReplicatingInputSplitAssigner.java       |  79 +++
 .../ReplicatingDataSourceITCase.java            | 141 ++++++
 28 files changed, 1006 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
index 10c77ca..af2a92b 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.NonParallelInput;
+import org.apache.flink.api.common.io.ReplicatingInputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.api.common.operators.GenericDataSourceBase;
 import org.apache.flink.api.common.operators.Operator;
@@ -48,6 +49,8 @@ public class DataSourceNode extends OptimizerNode {
 	
 	private final boolean sequentialInput;
 
+	private final boolean replicatedInput;
+
 	/**
 	 * Creates a new DataSourceNode for the given contract.
 	 * 
@@ -67,6 +70,12 @@ public class DataSourceNode extends OptimizerNode {
 		} else {
 			this.sequentialInput = false;
 		}
+
+		if (pactContract.getUserCodeWrapper().getUserCodeObject() instanceof ReplicatingInputFormat) {
+			this.replicatedInput = true;
+		} else {
+			this.replicatedInput = false;
+		}
 	}
 
 	/**
@@ -174,17 +183,31 @@ public class DataSourceNode extends OptimizerNode {
 		if (this.cachedPlans != null) {
 			return this.cachedPlans;
 		}
-		
+
 		SourcePlanNode candidate = new SourcePlanNode(this, "DataSource ("+this.getPactContract().getName()+")");
-		candidate.updatePropertiesWithUniqueSets(getUniqueFields());
-		
-		final Costs costs = new Costs();
-		if (FileInputFormat.class.isAssignableFrom(getPactContract().getFormatWrapper().getUserCodeClass()) &&
-				this.estimatedOutputSize >= 0)
-		{
-			estimator.addFileInputCost(this.estimatedOutputSize, costs);
+
+		if(!replicatedInput) {
+			candidate.updatePropertiesWithUniqueSets(getUniqueFields());
+
+			final Costs costs = new Costs();
+			if (FileInputFormat.class.isAssignableFrom(getPactContract().getFormatWrapper().getUserCodeClass()) &&
+					this.estimatedOutputSize >= 0) {
+				estimator.addFileInputCost(this.estimatedOutputSize, costs);
+			}
+			candidate.setCosts(costs);
+		} else {
+			// replicated input
+			final Costs costs = new Costs();
+			InputFormat<?,?> inputFormat =
+					((ReplicatingInputFormat<?,?>)getPactContract().getFormatWrapper().getUserCodeObject()).getReplicatedInputFormat();
+			if (FileInputFormat.class.isAssignableFrom(inputFormat.getClass()) &&
+					this.estimatedOutputSize >= 0) {
+				estimator.addFileInputCost(this.estimatedOutputSize * this.getDegreeOfParallelism(), costs);
+			}
+			candidate.setCosts(costs);
+
+			candidate.getGlobalProperties().setFullyReplicated();
 		}
-		candidate.setCosts(costs);
 
 		// since there is only a single plan for the data-source, return a list with that element only
 		List<PlanNode> plans = new ArrayList<PlanNode>(1);

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java
index e7e82fc..70c4291 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java
@@ -240,6 +240,8 @@ public abstract class SingleInputNode extends OptimizerNode {
 			return this.cachedPlans;
 		}
 
+		boolean childrenSkippedDueToReplicatedInput = false;
+
 		// calculate alternative sub-plans for predecessor
 		final List<? extends PlanNode> subPlans = getPredecessorNode().getAlternativePlans(estimator);
 		final Set<RequestedGlobalProperties> intGlobal = this.inConn.getInterestingProperties().getGlobalProperties();
@@ -279,6 +281,18 @@ public abstract class SingleInputNode extends OptimizerNode {
 
 		// create all candidates
 		for (PlanNode child : subPlans) {
+
+			if(child.getGlobalProperties().isFullyReplicated()) {
+				// fully replicated input is always locally forwarded if DOP is not changed
+				if(dopChange) {
+					// can not continue with this child
+					childrenSkippedDueToReplicatedInput = true;
+					continue;
+				} else {
+					this.inConn.setShipStrategy(ShipStrategyType.FORWARD);
+				}
+			}
+
 			if (this.inConn.getShipStrategy() == null) {
 				// pick the strategy ourselves
 				for (RequestedGlobalProperties igps: intGlobal) {
@@ -325,7 +339,15 @@ public abstract class SingleInputNode extends OptimizerNode {
 				}
 			}
 		}
-		
+
+		if(outputPlans.isEmpty()) {
+			if(childrenSkippedDueToReplicatedInput) {
+				throw new CompilerException("No plan meeting the requirements could be created @ " + this + ". Most likely reason: Invalid use of replicated input.");
+			} else {
+				throw new CompilerException("No plan meeting the requirements could be created @ " + this + ". Most likely reason: Too restrictive plan hints.");
+			}
+		}
+
 		// cost and prune the plans
 		for (PlanNode node : outputPlans) {
 			estimator.costOperator(node);

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
index 5e9a980..b391890 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/TwoInputNode.java
@@ -300,6 +300,8 @@ public abstract class TwoInputNode extends OptimizerNode {
 			return this.cachedPlans;
 		}
 
+		boolean childrenSkippedDueToReplicatedInput = false;
+
 		// step down to all producer nodes and calculate alternative plans
 		final List<? extends PlanNode> subPlans1 = getFirstPredecessorNode().getAlternativePlans(estimator);
 		final List<? extends PlanNode> subPlans2 = getSecondPredecessorNode().getAlternativePlans(estimator);
@@ -353,7 +355,30 @@ public abstract class TwoInputNode extends OptimizerNode {
 		
 		// create all candidates
 		for (PlanNode child1 : subPlans1) {
+
+			if(child1.getGlobalProperties().isFullyReplicated()) {
+				// fully replicated input is always locally forwarded if DOP is not changed
+				if(dopChange1) {
+					// can not continue with this child
+					childrenSkippedDueToReplicatedInput = true;
+					continue;
+				} else {
+					this.input1.setShipStrategy(ShipStrategyType.FORWARD);
+				}
+			}
+
 			for (PlanNode child2 : subPlans2) {
+
+				if(child2.getGlobalProperties().isFullyReplicated()) {
+					// fully replicated input is always locally forwarded if DOP is not changed
+					if(dopChange2) {
+						// can not continue with this child
+						childrenSkippedDueToReplicatedInput = true;
+						continue;
+					} else {
+						this.input2.setShipStrategy(ShipStrategyType.FORWARD);
+					}
+				}
 				
 				// check that the children go together. that is the case if they build upon the same
 				// candidate at the joined branch plan. 
@@ -457,6 +482,14 @@ public abstract class TwoInputNode extends OptimizerNode {
 			}
 		}
 
+		if(outputPlans.isEmpty()) {
+			if(childrenSkippedDueToReplicatedInput) {
+				throw new CompilerException("No plan meeting the requirements could be created @ " + this + ". Most likely reason: Invalid use of replicated input.");
+			} else {
+				throw new CompilerException("No plan meeting the requirements could be created @ " + this + ". Most likely reason: Too restrictive plan hints.");
+			}
+		}
+
 		// cost and prune the plans
 		for (PlanNode node : outputPlans) {
 			estimator.costOperator(node);

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
index fb1f1a2..ca7e64d 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
@@ -61,7 +61,7 @@ public class GlobalProperties implements Cloneable {
 	 * Initializes the global properties with no partitioning.
 	 */
 	public GlobalProperties() {
-		this.partitioning = PartitioningProperty.RANDOM;
+		this.partitioning = PartitioningProperty.RANDOM_PARTITIONED;
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -102,8 +102,8 @@ public class GlobalProperties implements Cloneable {
 		this.ordering = null;
 	}
 	
-	public void setRandomDistribution() {
-		this.partitioning = PartitioningProperty.RANDOM;
+	public void setRandomPartitioned() {
+		this.partitioning = PartitioningProperty.RANDOM_PARTITIONED;
 		this.partitioningFields = null;
 		this.ordering = null;
 	}
@@ -224,14 +224,14 @@ public class GlobalProperties implements Cloneable {
 	 * Checks, if the properties in this object are trivial, i.e. only standard values.
 	 */
 	public boolean isTrivial() {
-		return partitioning == PartitioningProperty.RANDOM;
+		return partitioning == PartitioningProperty.RANDOM_PARTITIONED;
 	}
 
 	/**
 	 * This method resets the properties to a state where no properties are given.
 	 */
 	public void reset() {
-		this.partitioning = PartitioningProperty.RANDOM;
+		this.partitioning = PartitioningProperty.RANDOM_PARTITIONED;
 		this.ordering = null;
 		this.partitioningFields = null;
 	}
@@ -254,8 +254,6 @@ public class GlobalProperties implements Cloneable {
 
 		// filter partitioning
 		switch(this.partitioning) {
-			case FULL_REPLICATION:
-				return gp;
 			case RANGE_PARTITIONED:
 				// check if ordering is preserved
 				Ordering newOrdering = new Ordering();
@@ -308,7 +306,8 @@ public class GlobalProperties implements Cloneable {
 				}
 				break;
 			case FORCED_REBALANCED:
-			case RANDOM:
+			case FULL_REPLICATION:
+			case RANDOM_PARTITIONED:
 				gp.partitioning = this.partitioning;
 				break;
 			default:
@@ -350,7 +349,7 @@ public class GlobalProperties implements Cloneable {
 
 	public void parameterizeChannel(Channel channel, boolean globalDopChange) {
 		switch (this.partitioning) {
-			case RANDOM:
+			case RANDOM_PARTITIONED:
 				channel.setShipStrategy(globalDopChange ? ShipStrategyType.PARTITION_RANDOM : ShipStrategyType.FORWARD);
 				break;
 			case FULL_REPLICATION:

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java
index 47cd6b8..2b66ea0 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java
@@ -22,11 +22,16 @@ package org.apache.flink.compiler.dataproperties;
  * An enumeration tracking the different types of sharding strategies.
  */
 public enum PartitioningProperty {
-	
+
+	/**
+	 * Any data distribution, i.e., random partitioning or full replication.
+	 */
+	ANY_DISTRIBUTION,
+
 	/**
 	 * Constant indicating no particular partitioning (i.e. random) data distribution.
 	 */
-	RANDOM,
+	RANDOM_PARTITIONED,
 
 	/**
 	 * Constant indicating a hash partitioning.
@@ -85,7 +90,7 @@ public enum PartitioningProperty {
 	 * @return True, if the data is partitioned on a key.
 	 */
 	public boolean isPartitionedOnKey() {
-		return isPartitioned() && this != RANDOM;
+		return isPartitioned() && this != RANDOM_PARTITIONED;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
index f304bf6..daaa7dc 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
@@ -53,7 +53,7 @@ public final class RequestedGlobalProperties implements Cloneable {
 	 * Initializes the global properties with no partitioning.
 	 */
 	public RequestedGlobalProperties() {
-		this.partitioning = PartitioningProperty.RANDOM;
+		this.partitioning = PartitioningProperty.RANDOM_PARTITIONED;
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -96,8 +96,14 @@ public final class RequestedGlobalProperties implements Cloneable {
 		this.ordering = null;
 	}
 	
-	public void setRandomDistribution() {
-		this.partitioning = PartitioningProperty.RANDOM;
+	public void setRandomPartitioning() {
+		this.partitioning = PartitioningProperty.RANDOM_PARTITIONED;
+		this.partitioningFields = null;
+		this.ordering = null;
+	}
+
+	public void setAnyDistribution() {
+		this.partitioning = PartitioningProperty.ANY_DISTRIBUTION;
 		this.partitioningFields = null;
 		this.ordering = null;
 	}
@@ -174,14 +180,14 @@ public final class RequestedGlobalProperties implements Cloneable {
 	 * Checks, if the properties in this object are trivial, i.e. only standard values.
 	 */
 	public boolean isTrivial() {
-		return this.partitioning == null || this.partitioning == PartitioningProperty.RANDOM;
+		return this.partitioning == null || this.partitioning == PartitioningProperty.RANDOM_PARTITIONED;
 	}
 
 	/**
 	 * This method resets the properties to a state where no properties are given.
 	 */
 	public void reset() {
-		this.partitioning = PartitioningProperty.RANDOM;
+		this.partitioning = PartitioningProperty.RANDOM_PARTITIONED;
 		this.ordering = null;
 		this.partitioningFields = null;
 		this.dataDistribution = null;
@@ -208,7 +214,8 @@ public final class RequestedGlobalProperties implements Cloneable {
 			case FULL_REPLICATION:
 			case FORCED_REBALANCED:
 			case CUSTOM_PARTITIONING:
-			case RANDOM:
+			case RANDOM_PARTITIONED:
+			case ANY_DISTRIBUTION:
 				// make sure that certain properties are not pushed down
 				return null;
 			case HASH_PARTITIONED:
@@ -255,13 +262,15 @@ public final class RequestedGlobalProperties implements Cloneable {
 	 * @return True, if the properties are met, false otherwise.
 	 */
 	public boolean isMetBy(GlobalProperties props) {
-		if (this.partitioning == PartitioningProperty.FULL_REPLICATION) {
+		if (this.partitioning == PartitioningProperty.ANY_DISTRIBUTION) {
+			return true;
+		} else if (this.partitioning == PartitioningProperty.FULL_REPLICATION) {
 			return props.isFullyReplicated();
 		}
 		else if (props.isFullyReplicated()) {
 			return false;
 		}
-		else if (this.partitioning == PartitioningProperty.RANDOM) {
+		else if (this.partitioning == PartitioningProperty.RANDOM_PARTITIONED) {
 			return true;
 		}
 		else if (this.partitioning == PartitioningProperty.ANY_PARTITIONING) {
@@ -295,9 +304,17 @@ public final class RequestedGlobalProperties implements Cloneable {
 	 * @param globalDopChange
 	 */
 	public void parameterizeChannel(Channel channel, boolean globalDopChange) {
+
+		// safety check. Fully replicated input must be preserved.
+		if(channel.getSource().getGlobalProperties().isFullyReplicated() &&
+				(	this.partitioning != PartitioningProperty.FULL_REPLICATION ||
+					this.partitioning != PartitioningProperty.ANY_DISTRIBUTION)) {
+			throw new CompilerException("Fully replicated input must be preserved and may not be converted into another global property.");
+		}
+
 		// if we request nothing, then we need no special strategy. forward, if the number of instances remains
 		// the same, randomly repartition otherwise
-		if (isTrivial()) {
+		if (isTrivial() || this.partitioning == PartitioningProperty.ANY_DISTRIBUTION) {
 			channel.setShipStrategy(globalDopChange ? ShipStrategyType.PARTITION_RANDOM : ShipStrategyType.FORWARD);
 			return;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
index d8f7746..b1c3079 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
@@ -144,7 +144,7 @@ public abstract class AbstractJoinDescriptor extends OperatorDescriptorDual {
 	public GlobalProperties computeGlobalProperties(GlobalProperties in1, GlobalProperties in2) {
 		GlobalProperties gp = GlobalProperties.combine(in1, in2);
 		if (gp.getUniqueFieldCombination() != null && gp.getUniqueFieldCombination().size() > 0 &&
-					gp.getPartitioning() == PartitioningProperty.RANDOM)
+					gp.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED)
 		{
 			gp.setAnyPartitioning(gp.getUniqueFieldCombination().iterator().next().toFieldList());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupReduceProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupReduceProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupReduceProperties.java
index 0390c06..2d74bbe 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupReduceProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupReduceProperties.java
@@ -57,7 +57,7 @@ public final class AllGroupReduceProperties extends OperatorDescriptorSingle {
 	@Override
 	public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
 		if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 &&
-				gProps.getPartitioning() == PartitioningProperty.RANDOM)
+				gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED)
 		{
 			gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupWithPartialPreGroupProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupWithPartialPreGroupProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupWithPartialPreGroupProperties.java
index 2c2ddf1..54885a7 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupWithPartialPreGroupProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AllGroupWithPartialPreGroupProperties.java
@@ -79,7 +79,7 @@ public final class AllGroupWithPartialPreGroupProperties extends OperatorDescrip
 	@Override
 	public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
 		if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 &&
-				gProps.getPartitioning() == PartitioningProperty.RANDOM)
+				gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED)
 		{
 			gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CartesianProductDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CartesianProductDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CartesianProductDescriptor.java
index fefd71a..cca0bb0 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CartesianProductDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CartesianProductDescriptor.java
@@ -100,7 +100,7 @@ public abstract class CartesianProductDescriptor extends OperatorDescriptorDual
 	public GlobalProperties computeGlobalProperties(GlobalProperties in1, GlobalProperties in2) {
 		GlobalProperties gp = GlobalProperties.combine(in1, in2);
 		if (gp.getUniqueFieldCombination() != null && gp.getUniqueFieldCombination().size() > 0 &&
-					gp.getPartitioning() == PartitioningProperty.RANDOM)
+					gp.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED)
 		{
 			gp.setAnyPartitioning(gp.getUniqueFieldCombination().iterator().next().toFieldList());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
index bc83c51..ff4ca6e 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
@@ -183,7 +183,7 @@ public class CoGroupDescriptor extends OperatorDescriptorDual {
 	public GlobalProperties computeGlobalProperties(GlobalProperties in1, GlobalProperties in2) {
 		GlobalProperties gp = GlobalProperties.combine(in1, in2);
 		if (gp.getUniqueFieldCombination() != null && gp.getUniqueFieldCombination().size() > 0 &&
-					gp.getPartitioning() == PartitioningProperty.RANDOM)
+					gp.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED)
 		{
 			gp.setAnyPartitioning(gp.getUniqueFieldCombination().iterator().next().toFieldList());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CollectorMapDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CollectorMapDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CollectorMapDescriptor.java
index 23c32a7..6007709 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CollectorMapDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CollectorMapDescriptor.java
@@ -47,7 +47,9 @@ public class CollectorMapDescriptor extends OperatorDescriptorSingle {
 
 	@Override
 	protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
-		return Collections.singletonList(new RequestedGlobalProperties());
+		RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+		rgp.setAnyDistribution();
+		return Collections.singletonList(rgp);
 	}
 
 	@Override
@@ -58,7 +60,7 @@ public class CollectorMapDescriptor extends OperatorDescriptorSingle {
 	@Override
 	public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
 		if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 &&
-				gProps.getPartitioning() == PartitioningProperty.RANDOM)
+				gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED)
 		{
 			gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/FilterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/FilterDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/FilterDescriptor.java
index b7ee761..0b4f373 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/FilterDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/FilterDescriptor.java
@@ -46,7 +46,9 @@ public class FilterDescriptor extends OperatorDescriptorSingle {
 
 	@Override
 	protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
-		return Collections.singletonList(new RequestedGlobalProperties());
+		RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+		rgp.setAnyDistribution();
+		return Collections.singletonList(rgp);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/FlatMapDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/FlatMapDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/FlatMapDescriptor.java
index 66993c4..6b06232 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/FlatMapDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/FlatMapDescriptor.java
@@ -47,7 +47,9 @@ public class FlatMapDescriptor extends OperatorDescriptorSingle {
 
 	@Override
 	protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
-		return Collections.singletonList(new RequestedGlobalProperties());
+		RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+		rgp.setAnyDistribution();
+		return Collections.singletonList(rgp);
 	}
 
 	@Override
@@ -58,7 +60,7 @@ public class FlatMapDescriptor extends OperatorDescriptorSingle {
 	@Override
 	public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
 		if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 &&
-				gProps.getPartitioning() == PartitioningProperty.RANDOM)
+				gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED)
 		{
 			gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceProperties.java
index ab93170..8d52503 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceProperties.java
@@ -114,7 +114,7 @@ public final class GroupReduceProperties extends OperatorDescriptorSingle {
 	@Override
 	public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
 		if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 &&
-				gProps.getPartitioning() == PartitioningProperty.RANDOM)
+				gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED)
 		{
 			gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java
index 8604951..fd263e6 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java
@@ -146,7 +146,7 @@ public final class GroupReduceWithCombineProperties extends OperatorDescriptorSi
 	@Override
 	public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
 		if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 &&
-				gProps.getPartitioning() == PartitioningProperty.RANDOM)
+				gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED)
 		{
 			gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapDescriptor.java
index cb1d258..673716d 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapDescriptor.java
@@ -46,7 +46,9 @@ public class MapDescriptor extends OperatorDescriptorSingle {
 
 	@Override
 	protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
-		return Collections.singletonList(new RequestedGlobalProperties());
+		RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+		rgp.setAnyDistribution();
+		return Collections.singletonList(rgp);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapPartitionDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapPartitionDescriptor.java
index f0af88b..dc67321 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapPartitionDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/MapPartitionDescriptor.java
@@ -46,7 +46,9 @@ public class MapPartitionDescriptor extends OperatorDescriptorSingle {
 
 	@Override
 	protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
-		return Collections.singletonList(new RequestedGlobalProperties());
+		RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+		rgp.setAnyDistribution();
+		return Collections.singletonList(rgp);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorSingle.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorSingle.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorSingle.java
index 45daceb..7919b2b 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorSingle.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorSingle.java
@@ -38,7 +38,7 @@ public abstract class OperatorDescriptorSingle implements AbstractOperatorDescri
 	
 	protected final FieldSet keys;			// the set of key fields
 	protected final FieldList keyList;		// the key fields with ordered field positions
-	
+
 	private List<RequestedGlobalProperties> globalProps;
 	private List<RequestedLocalProperties> localProps;
 	
@@ -51,8 +51,8 @@ public abstract class OperatorDescriptorSingle implements AbstractOperatorDescri
 		this.keys = keys;
 		this.keyList = keys == null ? null : keys.toFieldList();
 	}
-	
-	
+
+
 	public List<RequestedGlobalProperties> getPossibleGlobalProperties() {
 		if (this.globalProps == null) {
 			this.globalProps = createPossibleGlobalProperties();
@@ -66,7 +66,7 @@ public abstract class OperatorDescriptorSingle implements AbstractOperatorDescri
 		}
 		return this.localProps;
 	}
-	
+
 	/**
 	 * Returns a list of global properties that are required by this operator descriptor.
 	 * 

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java
index cf33bbe..7954773 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java
@@ -76,7 +76,7 @@ public final class PartialGroupProperties extends OperatorDescriptorSingle {
 	@Override
 	public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
 		if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 &&
-				gProps.getPartitioning() == PartitioningProperty.RANDOM)
+				gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED)
 		{
 			gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/main/java/org/apache/flink/compiler/operators/ReduceProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/ReduceProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/ReduceProperties.java
index 813af20..000079d 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/ReduceProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/ReduceProperties.java
@@ -103,7 +103,7 @@ public final class ReduceProperties extends OperatorDescriptorSingle {
 	@Override
 	public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
 		if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 &&
-				gProps.getPartitioning() == PartitioningProperty.RANDOM)
+				gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED)
 		{
 			gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java
index c3a4c3a..c90a89b 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.compiler;
 
+import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
 import org.junit.Assert;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.record.operators.FileDataSink;
@@ -257,8 +258,15 @@ public class DOPChangeTest extends CompilerTestBase {
 		// mapper respectively reducer
 		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
 		SingleInputPlanNode red2Node = (SingleInputPlanNode) sinkNode.getPredecessor();
+		SingleInputPlanNode map2Node = (SingleInputPlanNode) red2Node.getPredecessor();
+
+		Assert.assertTrue("The no sorting local strategy.",
+				LocalStrategy.SORT == red2Node.getInput().getLocalStrategy() ||
+						LocalStrategy.SORT == map2Node.getInput().getLocalStrategy());
 
-		Assert.assertEquals("The Reduce 2 Node has an invalid local strategy.", LocalStrategy.SORT, red2Node.getInput().getLocalStrategy());
+		Assert.assertTrue("The no partitioning ship strategy.",
+				ShipStrategyType.PARTITION_HASH == red2Node.getInput().getShipStrategy() ||
+						ShipStrategyType.PARTITION_HASH == map2Node.getInput().getShipStrategy());
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/test/java/org/apache/flink/compiler/ReplicatingDataSourceTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/ReplicatingDataSourceTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/ReplicatingDataSourceTest.java
new file mode 100644
index 0000000..1cbac25
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/ReplicatingDataSourceTest.java
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.io.ReplicatingInputFormat;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvInputFormat;
+import org.apache.flink.api.java.operators.DataSink;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.compiler.plan.DualInputPlanNode;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+
+@SuppressWarnings({"serial", "deprecation"})
+public class ReplicatingDataSourceTest extends CompilerTestBase {
+
+	/**
+	 * Tests join program with replicated data source.
+	 */
+	@Test
+	public void checkJoinWithReplicatedSourceInput() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+
+		DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+		DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
+
+		DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+				.join(source2).where("*").equalTo("*")
+				.writeAsText("/some/newpath");
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		// when join should have forward strategy on both sides
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode joinNode = (DualInputPlanNode) sinkNode.getPredecessor();
+
+		ShipStrategyType joinIn1 = joinNode.getInput1().getShipStrategy();
+		ShipStrategyType joinIn2 = joinNode.getInput2().getShipStrategy();
+
+		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, joinIn1);
+		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, joinIn2);
+	}
+
+	/**
+	 * Tests join program with replicated data source behind map.
+	 */
+	@Test
+	public void checkJoinWithReplicatedSourceInputBehindMap() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+
+		DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+		DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
+
+		DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+				.map(new IdMap())
+				.join(source2).where("*").equalTo("*")
+				.writeAsText("/some/newpath");
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		// when join should have forward strategy on both sides
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode joinNode = (DualInputPlanNode) sinkNode.getPredecessor();
+
+		ShipStrategyType joinIn1 = joinNode.getInput1().getShipStrategy();
+		ShipStrategyType joinIn2 = joinNode.getInput2().getShipStrategy();
+
+		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, joinIn1);
+		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, joinIn2);
+	}
+
+	/**
+	 * Tests join program with replicated data source behind filter.
+	 */
+	@Test
+	public void checkJoinWithReplicatedSourceInputBehindFilter() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+
+		DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+		DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
+
+		DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+				.filter(new NoFilter())
+				.join(source2).where("*").equalTo("*")
+				.writeAsText("/some/newpath");
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		// when join should have forward strategy on both sides
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode joinNode = (DualInputPlanNode) sinkNode.getPredecessor();
+
+		ShipStrategyType joinIn1 = joinNode.getInput1().getShipStrategy();
+		ShipStrategyType joinIn2 = joinNode.getInput2().getShipStrategy();
+
+		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, joinIn1);
+		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, joinIn2);
+	}
+
+	/**
+	 * Tests join program with replicated data source behind flatMap.
+	 */
+	@Test
+	public void checkJoinWithReplicatedSourceInputBehindFlatMap() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+
+		DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+		DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
+
+		DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+				.flatMap(new IdFlatMap())
+				.join(source2).where("*").equalTo("*")
+				.writeAsText("/some/newpath");
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		// when join should have forward strategy on both sides
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode joinNode = (DualInputPlanNode) sinkNode.getPredecessor();
+
+		ShipStrategyType joinIn1 = joinNode.getInput1().getShipStrategy();
+		ShipStrategyType joinIn2 = joinNode.getInput2().getShipStrategy();
+
+		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, joinIn1);
+		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, joinIn2);
+	}
+
+	/**
+	 * Tests join program with replicated data source behind map partition.
+	 */
+	@Test
+	public void checkJoinWithReplicatedSourceInputBehindMapPartition() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+
+		DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+		DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
+
+		DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+				.mapPartition(new IdPMap())
+				.join(source2).where("*").equalTo("*")
+				.writeAsText("/some/newpath");
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		// when join should have forward strategy on both sides
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode joinNode = (DualInputPlanNode) sinkNode.getPredecessor();
+
+		ShipStrategyType joinIn1 = joinNode.getInput1().getShipStrategy();
+		ShipStrategyType joinIn2 = joinNode.getInput2().getShipStrategy();
+
+		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, joinIn1);
+		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, joinIn2);
+	}
+
+	/**
+	 * Tests join program with replicated data source behind multiple map ops.
+	 */
+	@Test
+	public void checkJoinWithReplicatedSourceInputBehindMultiMaps() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+
+		DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+		DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
+
+		DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+				.filter(new NoFilter())
+				.mapPartition(new IdPMap())
+				.flatMap(new IdFlatMap())
+				.map(new IdMap())
+				.join(source2).where("*").equalTo("*")
+				.writeAsText("/some/newpath");
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		// when join should have forward strategy on both sides
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode joinNode = (DualInputPlanNode) sinkNode.getPredecessor();
+
+		ShipStrategyType joinIn1 = joinNode.getInput1().getShipStrategy();
+		ShipStrategyType joinIn2 = joinNode.getInput2().getShipStrategy();
+
+		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, joinIn1);
+		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, joinIn2);
+	}
+
+	/**
+	 * Tests cross program with replicated data source.
+	 */
+	@Test
+	public void checkCrossWithReplicatedSourceInput() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+
+		DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+		DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
+
+		DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+				.cross(source2)
+				.writeAsText("/some/newpath");
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		// when cross should have forward strategy on both sides
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode crossNode = (DualInputPlanNode) sinkNode.getPredecessor();
+
+		ShipStrategyType crossIn1 = crossNode.getInput1().getShipStrategy();
+		ShipStrategyType crossIn2 = crossNode.getInput2().getShipStrategy();
+
+		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, crossIn1);
+		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, crossIn2);
+	}
+
+	/**
+	 * Tests cross program with replicated data source behind map and filter.
+	 */
+	@Test
+	public void checkCrossWithReplicatedSourceInputBehindMap() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+
+		DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+		DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
+
+		DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+				.map(new IdMap())
+				.filter(new NoFilter())
+				.cross(source2)
+				.writeAsText("/some/newpath");
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+		// check the optimized Plan
+		// when cross should have forward strategy on both sides
+		SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode crossNode = (DualInputPlanNode) sinkNode.getPredecessor();
+
+		ShipStrategyType crossIn1 = crossNode.getInput1().getShipStrategy();
+		ShipStrategyType crossIn2 = crossNode.getInput2().getShipStrategy();
+
+		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, crossIn1);
+		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, crossIn2);
+	}
+
+	/**
+	 * Tests compiler fail for join program with replicated data source and changing DOP.
+	 */
+	@Test(expected = CompilerException.class)
+	public void checkJoinWithReplicatedSourceInputChangingDOP() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+
+		DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+		DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
+
+		DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+				.join(source2).where("*").equalTo("*").setParallelism(DEFAULT_PARALLELISM+2)
+				.writeAsText("/some/newpath");
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+	}
+
+	/**
+	 * Tests compiler fail for join program with replicated data source behind map and changing DOP.
+	 */
+	@Test(expected = CompilerException.class)
+	public void checkJoinWithReplicatedSourceInputBehindMapChangingDOP() {
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+
+		DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+		DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
+
+		DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+				.map(new IdMap()).setParallelism(DEFAULT_PARALLELISM+1)
+				.join(source2).where("*").equalTo("*")
+				.writeAsText("/some/newpath");
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+
+	}
+
+	/**
+	 * Tests compiler fail for join program with replicated data source behind reduce.
+	 */
+	@Test(expected = CompilerException.class)
+	public void checkJoinWithReplicatedSourceInputBehindReduce() {
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+
+		DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+		DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
+
+		DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+				.reduce(new LastReduce())
+				.join(source2).where("*").equalTo("*")
+				.writeAsText("/some/newpath");
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+	}
+
+	/**
+	 * Tests compiler fail for join program with replicated data source behind rebalance.
+	 */
+	@Test(expected = CompilerException.class)
+	public void checkJoinWithReplicatedSourceInputBehindRebalance() {
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
+
+		ReplicatingInputFormat<Tuple1<String>, FileInputSplit> rif =
+				new ReplicatingInputFormat<Tuple1<String>, FileInputSplit>(new CsvInputFormat<Tuple1<String>>(new Path("/some/path"), String.class));
+
+		DataSet<Tuple1<String>> source1 = env.createInput(rif, new TupleTypeInfo<Tuple1<String>>(BasicTypeInfo.STRING_TYPE_INFO));
+		DataSet<Tuple1<String>> source2 = env.readCsvFile("/some/otherpath").types(String.class);
+
+		DataSink<Tuple2<Tuple1<String>, Tuple1<String>>> out = source1
+				.rebalance()
+				.join(source2).where("*").equalTo("*")
+				.writeAsText("/some/newpath");
+
+		JavaPlan plan = env.createProgramPlan();
+
+		// submit the plan to the compiler
+		OptimizedPlan oPlan = compileNoStats(plan);
+	}
+
+
+	public static class IdMap<T> implements MapFunction<T,T> {
+
+		@Override
+		public T map(T value) throws Exception {
+			return value;
+		}
+	}
+
+	public static class NoFilter<T> implements FilterFunction<T> {
+
+		@Override
+		public boolean filter(T value) throws Exception {
+			return false;
+		}
+	}
+
+	public static class IdFlatMap<T> implements FlatMapFunction<T,T> {
+
+		@Override
+		public void flatMap(T value, Collector<T> out) throws Exception {
+			out.collect(value);
+		}
+	}
+
+	public static class IdPMap<T> implements MapPartitionFunction<T,T> {
+
+		@Override
+		public void mapPartition(Iterable<T> values, Collector<T> out) throws Exception {
+			for(T v : values) {
+				out.collect(v);
+			}
+		}
+	}
+
+	public static class LastReduce<T> implements ReduceFunction<T> {
+
+		@Override
+		public T reduce(T value1, T value2) throws Exception {
+			return value2;
+		}
+	}
+
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesFilteringTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesFilteringTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesFilteringTest.java
index f8e2242..ff7530a 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesFilteringTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesFilteringTest.java
@@ -59,7 +59,7 @@ public class GlobalPropertiesFilteringTest {
 
 		GlobalProperties result = gprops.filterBySemanticProperties(semProps, 0);
 
-		assertEquals(PartitioningProperty.RANDOM, result.getPartitioning());
+		assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning());
 		assertNull(result.getPartitioningFields());
 		assertNull(result.getPartitioningOrdering());
 		assertNull(result.getUniqueFieldCombination());
@@ -78,7 +78,7 @@ public class GlobalPropertiesFilteringTest {
 
 		GlobalProperties result = gprops.filterBySemanticProperties(semProps, 0);
 
-		assertEquals(PartitioningProperty.RANDOM, result.getPartitioning());
+		assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning());
 		assertNull(result.getPartitioningFields());
 		assertNull(result.getPartitioningOrdering());
 		assertNull(result.getUniqueFieldCombination());
@@ -133,7 +133,7 @@ public class GlobalPropertiesFilteringTest {
 
 		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
 
-		assertEquals(PartitioningProperty.RANDOM, result.getPartitioning());
+		assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning());
 		assertNull(result.getPartitioningFields());
 	}
 
@@ -186,7 +186,7 @@ public class GlobalPropertiesFilteringTest {
 
 		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
 
-		assertEquals(PartitioningProperty.RANDOM, result.getPartitioning());
+		assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning());
 		assertNull(result.getPartitioningFields());
 	}
 
@@ -242,7 +242,7 @@ public class GlobalPropertiesFilteringTest {
 
 		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
 
-		assertEquals(PartitioningProperty.RANDOM, result.getPartitioning());
+		assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning());
 		assertNull(result.getPartitioningFields());
 		assertNull(result.getCustomPartitioner());
 	}
@@ -330,7 +330,7 @@ public class GlobalPropertiesFilteringTest {
 
 		GlobalProperties result = gprops.filterBySemanticProperties(sprops, 0);
 
-		assertEquals(PartitioningProperty.RANDOM, result.getPartitioning());
+		assertEquals(PartitioningProperty.RANDOM_PARTITIONED, result.getPartitioning());
 		assertNull(result.getPartitioningOrdering());
 		assertNull(result.getPartitioningFields());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java
index e094640..3f9c0db 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/RequestedGlobalPropertiesFilteringTest.java
@@ -345,7 +345,7 @@ public class RequestedGlobalPropertiesFilteringTest {
 		SemanticPropUtil.getSemanticPropsSingleFromString(sProp, new String[]{"0;1;2"}, null, null, tupleInfo, tupleInfo);
 
 		RequestedGlobalProperties rgProps = new RequestedGlobalProperties();
-		rgProps.setRandomDistribution();
+		rgProps.setRandomPartitioning();
 
 		RequestedGlobalProperties filtered = rgProps.filterBySemanticProperties(sProp, 0);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
new file mode 100644
index 0000000..0adccaf
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+
+import java.io.IOException;
+
+/**
+ * A ReplicatingInputFormat replicates any {@link InputFormat} to all parallel instances of a DataSource,
+ * i.e., the full input of the replicated InputFormat is completely processed by each parallel instance of the DataSource.
+ * This is done by assigning all {@link org.apache.flink.core.io.InputSplit}s generated by the
+ * replicated InputFormat to each parallel instance.
+ *
+ * Replicated data can only be used as input for a {@link org.apache.flink.api.common.operators.base.JoinOperatorBase} or
+ * {@link org.apache.flink.api.common.operators.base.CrossOperatorBase} with the same degree of parallelism as the DataSource.
+ * Before being used as an input to a Join or Cross operator, replicated data might be processed in local pipelines by
+ * by Map-based operators with the same degree of parallelism as the source. Map-based operators are
+ * {@link org.apache.flink.api.common.operators.base.MapOperatorBase},
+ * {@link org.apache.flink.api.common.operators.base.FlatMapOperatorBase},
+ * {@link org.apache.flink.api.common.operators.base.FilterOperatorBase}, and
+ * {@link org.apache.flink.api.common.operators.base.MapPartitionOperatorBase}.
+ *
+ * Replicated DataSources can be used for local join processing (no data shipping) if one input is accessible on all
+ * parallel instance of a join and the other input is (randomly) partitioned across all parallel instances.
+ *
+ * However, a replicated DataSource is a plan hint that can invalidate a Flink program if not used correctly (see
+ * usage instructions above). In such situations, the optimizer is not able to generate a valid execution plan and
+ * the program execution will fail.
+ *
+ * @param <OT> The output type of the wrapped InputFormat.
+ * @param <S> The InputSplit type of the wrapped InputFormat.
+ *
+ * @see org.apache.flink.api.common.io.InputFormat
+ * @see org.apache.flink.api.common.operators.base.JoinOperatorBase
+ * @see org.apache.flink.api.common.operators.base.CrossOperatorBase
+ * @see org.apache.flink.api.common.operators.base.MapOperatorBase
+ * @see org.apache.flink.api.common.operators.base.FlatMapOperatorBase
+ * @see org.apache.flink.api.common.operators.base.FilterOperatorBase
+ * @see org.apache.flink.api.common.operators.base.MapPartitionOperatorBase
+ */
+public final class ReplicatingInputFormat<OT, S extends InputSplit> implements InputFormat<OT, S> {
+
+	private static final long serialVersionUID = 1L;
+
+	private InputFormat<OT, S> replicatedIF;
+
+	public ReplicatingInputFormat(InputFormat<OT, S> wrappedIF) {
+		this.replicatedIF = wrappedIF;
+	}
+
+	public InputFormat<OT, S> getReplicatedInputFormat() {
+		return this.replicatedIF;
+	}
+
+	@Override
+	public void configure(Configuration parameters) {
+		this.replicatedIF.configure(parameters);
+	}
+
+	@Override
+	public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
+		return this.replicatedIF.getStatistics(cachedStatistics);
+	}
+
+	@Override
+	public S[] createInputSplits(int minNumSplits) throws IOException {
+		return this.replicatedIF.createInputSplits(minNumSplits);
+	}
+
+	@Override
+	public InputSplitAssigner getInputSplitAssigner(S[] inputSplits) {
+		return new ReplicatingInputSplitAssigner(inputSplits);
+	}
+
+	@Override
+	public void open(S split) throws IOException {
+		this.replicatedIF.open(split);
+	}
+
+	@Override
+	public boolean reachedEnd() throws IOException {
+		return this.replicatedIF.reachedEnd();
+	}
+
+	@Override
+	public OT nextRecord(OT reuse) throws IOException {
+		return this.replicatedIF.nextRecord(reuse);
+	}
+
+	@Override
+	public void close() throws IOException {
+		this.replicatedIF.close();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputSplitAssigner.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputSplitAssigner.java b/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputSplitAssigner.java
new file mode 100644
index 0000000..315fbcd
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputSplitAssigner.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Assigns each InputSplit to each requesting parallel instance.
+ * This causes the input to be fully replicated, i.e., each parallel instance consumes the full input.
+ */
+public class ReplicatingInputSplitAssigner implements InputSplitAssigner {
+
+	private InputSplit[] inputSplits;
+
+	private int[] assignCounts;
+
+	public ReplicatingInputSplitAssigner(Collection<InputSplit> splits) {
+		this.inputSplits = new InputSplit[splits.size()];
+		this.inputSplits = splits.toArray(this.inputSplits);
+		this.assignCounts = new int[32];
+		Arrays.fill(assignCounts, 0);
+	}
+
+	public ReplicatingInputSplitAssigner(InputSplit[] splits) {
+		this.inputSplits = splits;
+		this.assignCounts = new int[32];
+		Arrays.fill(assignCounts, 0);
+	}
+
+	@Override
+	public InputSplit getNextInputSplit(String host, int taskId) {
+
+		// get assignment count
+		Integer assignCnt;
+		if(taskId < this.assignCounts.length) {
+			assignCnt = this.assignCounts[taskId];
+		} else {
+			int newSize = this.assignCounts.length * 2;
+			if (taskId >= newSize) {
+				newSize = taskId;
+			}
+			int[] newAssignCounts = Arrays.copyOf(assignCounts, newSize);
+			Arrays.fill(newAssignCounts, assignCounts.length, newSize, 0);
+
+			assignCnt = 0;
+		}
+
+		if(assignCnt >= inputSplits.length) {
+			// all splits for this task have been assigned
+			return null;
+		} else {
+			// return next splits
+			InputSplit is = inputSplits[assignCnt];
+			assignCounts[taskId] = assignCnt+1;
+			return is;
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a19b4a02/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java
new file mode 100644
index 0000000..85e3e11
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReplicatingDataSourceITCase.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators;
+
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.io.ReplicatingInputFormat;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.ParallelIteratorInputFormat;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.NumberSequenceIterator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Tests for replicating DataSources
+ */
+
+@RunWith(Parameterized.class)
+public class ReplicatingDataSourceITCase extends MultipleProgramsTestBase {
+
+	public ReplicatingDataSourceITCase(MultipleProgramsTestBase.ExecutionMode mode){
+		super(mode);
+	}
+
+	private String resultPath;
+
+	private String expectedResult;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
+	}
+
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expectedResult, resultPath); // 500500 = 0+1+2+3+...+999+1000
+	}
+
+	@Test
+	public void testReplicatedSourceToJoin() throws Exception {
+		/*
+		 * Test replicated source going into join
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple1<Long>> source1 = env.createInput(new ReplicatingInputFormat<Long, GenericInputSplit>
+				(new ParallelIteratorInputFormat<Long>(new NumberSequenceIterator(0l, 1000l))), BasicTypeInfo.LONG_TYPE_INFO)
+				.map(new ToTuple());
+		DataSet<Tuple1<Long>> source2 = env.generateSequence(0l, 1000l).map(new ToTuple());
+
+		DataSet<Tuple> pairs = source1.join(source2).where(0).equalTo(0)
+				.projectFirst(0)
+				.sum(0);
+
+		pairs.writeAsText(resultPath);
+		env.execute();
+
+		expectedResult = "(500500)";
+
+	}
+
+	@Test
+	public void testReplicatedSourceToCross() throws Exception {
+		/*
+		 * Test replicated source going into cross
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple1<Long>> source1 = env.createInput(new ReplicatingInputFormat<Long, GenericInputSplit>
+				(new ParallelIteratorInputFormat<Long>(new NumberSequenceIterator(0l, 1000l))), BasicTypeInfo.LONG_TYPE_INFO)
+				.map(new ToTuple());
+		DataSet<Tuple1<Long>> source2 = env.generateSequence(0l, 1000l).map(new ToTuple());
+
+		DataSet<Tuple1<Long>> pairs = source1.cross(source2)
+				.filter(new FilterFunction<Tuple2<Tuple1<Long>, Tuple1<Long>>>() {
+					@Override
+					public boolean filter(Tuple2<Tuple1<Long>, Tuple1<Long>> value) throws Exception {
+						return value.f0.f0.equals(value.f1.f0);
+					}
+				})
+				.map(new MapFunction<Tuple2<Tuple1<Long>, Tuple1<Long>>, Tuple1<Long>>() {
+					@Override
+					public Tuple1<Long> map(Tuple2<Tuple1<Long>, Tuple1<Long>> value) throws Exception {
+						return value.f0;
+					}
+				})
+				.sum(0);
+
+		pairs.writeAsText(resultPath);
+		env.execute();
+
+		expectedResult = "(500500)";
+
+	}
+
+
+	public static class ToTuple implements MapFunction<Long, Tuple1<Long>> {
+
+		@Override
+		public Tuple1<Long> map(Long value) throws Exception {
+			return new Tuple1<Long>(value);
+		}
+	}
+
+
+}


Mime
View raw message