flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject flink git commit: [FLINK-1628] [optimizer] Fix partitioning properties for Joins and CoGroups.
Date Tue, 10 Mar 2015 07:53:08 GMT
Repository: flink
Updated Branches:
  refs/heads/release-0.8 d573926ab -> 88c7ea256


[FLINK-1628] [optimizer] Fix partitioning properties for Joins and CoGroups.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/88c7ea25
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/88c7ea25
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/88c7ea25

Branch: refs/heads/release-0.8
Commit: 88c7ea256e263dec52fbcbbd1e681ae088075edb
Parents: d573926
Author: Fabian Hueske <fhueske@apache.org>
Authored: Wed Mar 4 18:49:22 2015 +0100
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Tue Mar 10 08:52:47 2015 +0100

----------------------------------------------------------------------
 .../dataproperties/GlobalProperties.java        |   8 +
 .../RequestedGlobalProperties.java              |  48 +-
 .../operators/AbstractJoinDescriptor.java       |  49 +-
 .../compiler/operators/CoGroupDescriptor.java   |  72 +-
 .../operators/OperatorDescriptorDual.java       |  44 +-
 .../operators/SortMergeJoinDescriptor.java      |  13 +-
 .../compiler/FeedbackPropertiesMatchTest.java   |   8 +-
 .../flink/compiler/PartitioningReusageTest.java | 859 +++++++++++++++++++
 .../GlobalPropertiesMatchingTest.java           | 152 +++-
 .../api/common/operators/util/FieldList.java    |  15 +-
 10 files changed, 1214 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/88c7ea25/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
index 7dedc53..001d847 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
@@ -180,6 +180,14 @@ public class GlobalProperties implements Cloneable {
 			return false;
 		}
 	}
+
+	public boolean isExactlyPartitionedOnFields(FieldList fields) {
+		if (this.partitioning.isPartitionedOnKey() && fields.isExactMatch(this.partitioningFields)) {
+			return true;
+		} else {
+			return false;
+		}
+	}
 	
 	public boolean matchesOrderedPartitioning(Ordering o) {
 		if (this.partitioning == PartitioningProperty.RANGE_PARTITIONED) {

http://git-wip-us.apache.org/repos/asf/flink/blob/88c7ea25/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
index 4e9d60a..33c8476 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
@@ -21,6 +21,7 @@ package org.apache.flink.compiler.dataproperties;
 import org.apache.flink.api.common.distributions.DataDistribution;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.dag.OptimizerNode;
@@ -46,7 +47,7 @@ public final class RequestedGlobalProperties implements Cloneable {
 	private DataDistribution dataDistribution;	// optional data distribution, for a range partitioning
 	
 	private Partitioner<?> customPartitioner;	// optional, partitioner for custom partitioning
-	
+
 	// --------------------------------------------------------------------------------------------
 	
 	/**
@@ -60,7 +61,9 @@ public final class RequestedGlobalProperties implements Cloneable {
 	
 	/**
 	 * Sets the partitioning property for the global properties.
-	 * 
+	 * If the partitionFields are provided as {@link FieldSet} also subsets are valid,
+	 * if provided as {@link FieldList} partitioning fields must exactly match incl. order.
+	 *
 	 * @param partitionedFields
 	 */
 	public void setHashPartitioned(FieldSet partitionedFields) {
@@ -86,7 +89,14 @@ public final class RequestedGlobalProperties implements Cloneable {
 		this.partitioningFields = null;
 		this.dataDistribution = dataDistribution;
 	}
-	
+
+	/**
+	 * Sets the partitioning property for the global properties.
+	 * If the partitionFields are provided as {@link FieldSet} also subsets are valid,
+	 * if provided as {@link FieldList} partitioning fields must exactly match incl. order.
+	 *
+	 * @param partitionedFields
+	 */
 	public void setAnyPartitioning(FieldSet partitionedFields) {
 		if (partitionedFields == null) {
 			throw new NullPointerException();
@@ -113,7 +123,14 @@ public final class RequestedGlobalProperties implements Cloneable {
 		this.partitioningFields = null;
 		this.ordering = null;
 	}
-	
+
+	/**
+	 * Sets the partitioning property for the global properties.
+	 * If the partitionFields are provided as {@link FieldSet} also subsets are valid,
+	 * if provided as {@link FieldList} partitioning fields must exactly match incl. order.
+	 *
+	 * @param partitionedFields
+	 */
 	public void setCustomPartitioned(FieldSet partitionedFields, Partitioner<?> partitioner) {
 		if (partitionedFields == null || partitioner == null) {
 			throw new NullPointerException();
@@ -124,7 +141,7 @@ public final class RequestedGlobalProperties implements Cloneable {
 		this.ordering = null;
 		this.customPartitioner = partitioner;
 	}
-	
+
 	/**
 	 * Gets the partitioning property.
 	 * 
@@ -142,7 +159,7 @@ public final class RequestedGlobalProperties implements Cloneable {
 	public FieldSet getPartitionedFields() {
 		return this.partitioningFields;
 	}
-	
+
 	/**
 	 * Gets the key order.
 	 * 
@@ -242,11 +259,11 @@ public final class RequestedGlobalProperties implements Cloneable {
 			return true;
 		}
 		else if (this.partitioning == PartitioningProperty.ANY_PARTITIONING) {
-			return props.isPartitionedOnFields(this.partitioningFields);
+			return checkCompatiblePartitioningFields(props);
 		}
 		else if (this.partitioning == PartitioningProperty.HASH_PARTITIONED) {
 			return props.getPartitioning() == PartitioningProperty.HASH_PARTITIONED &&
-					props.isPartitionedOnFields(this.partitioningFields);
+					checkCompatiblePartitioningFields(props);
 		}
 		else if (this.partitioning == PartitioningProperty.RANGE_PARTITIONED) {
 			return props.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED &&
@@ -257,14 +274,15 @@ public final class RequestedGlobalProperties implements Cloneable {
 		}
 		else if (this.partitioning == PartitioningProperty.CUSTOM_PARTITIONING) {
 			return props.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING &&
-					props.isPartitionedOnFields(this.partitioningFields) &&
+					checkCompatiblePartitioningFields(props) &&
 					props.getCustomPartitioner().equals(this.customPartitioner);
+
 		}
 		else {
 			throw new CompilerException("Properties matching logic leaves open cases.");
 		}
 	}
-	
+
 	/**
 	 * Parameterizes the ship strategy fields of a channel such that the channel produces the desired global properties.
 	 * 
@@ -272,6 +290,7 @@ public final class RequestedGlobalProperties implements Cloneable {
 	 * @param globalDopChange
 	 */
 	public void parameterizeChannel(Channel channel, boolean globalDopChange) {
+
 		// if we request nothing, then we need no special strategy. forward, if the number of instances remains
 		// the same, randomly repartition otherwise
 		if (isTrivial()) {
@@ -357,4 +376,13 @@ public final class RequestedGlobalProperties implements Cloneable {
 			throw new RuntimeException(cnse);
 		}
 	}
+
+	private boolean checkCompatiblePartitioningFields(GlobalProperties props) {
+		if(this.partitioningFields instanceof FieldList) {
+			// partitioningFields as FieldList requires strict checking!
+			return props.isExactlyPartitionedOnFields((FieldList)this.partitioningFields);
+		} else {
+			return props.isPartitionedOnFields(this.partitioningFields);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/88c7ea25/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
index d8f7746..ebcf123 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
@@ -64,15 +64,15 @@ public abstract class AbstractJoinDescriptor extends OperatorDescriptorDual {
 		if (repartitionAllowed) {
 			// partition both (hash or custom)
 			if (this.customPartitioner == null) {
-				
+
 				// we accept compatible partitionings of any type
 				RequestedGlobalProperties partitioned_left_any = new RequestedGlobalProperties();
 				RequestedGlobalProperties partitioned_right_any = new RequestedGlobalProperties();
 				partitioned_left_any.setAnyPartitioning(this.keys1);
 				partitioned_right_any.setAnyPartitioning(this.keys2);
 				pairs.add(new GlobalPropertiesPair(partitioned_left_any, partitioned_right_any));
-				
-				// we also explicitly add hash partitioning, as a fallback, if the any-pairs do not match
+
+				// add strict hash partitioning of both inputs on their full key sets
 				RequestedGlobalProperties partitioned_left_hash = new RequestedGlobalProperties();
 				RequestedGlobalProperties partitioned_right_hash = new RequestedGlobalProperties();
 				partitioned_left_hash.setHashPartitioned(this.keys1);
@@ -82,10 +82,10 @@ public abstract class AbstractJoinDescriptor extends OperatorDescriptorDual {
 			else {
 				RequestedGlobalProperties partitioned_left = new RequestedGlobalProperties();
 				partitioned_left.setCustomPartitioned(this.keys1, this.customPartitioner);
-				
+
 				RequestedGlobalProperties partitioned_right = new RequestedGlobalProperties();
 				partitioned_right.setCustomPartitioned(this.keys2, this.customPartitioner);
-				
+
 				return Collections.singletonList(new GlobalPropertiesPair(partitioned_left, partitioned_right));
 			}
 			
@@ -130,10 +130,40 @@ public abstract class AbstractJoinDescriptor extends OperatorDescriptorDual {
 			GlobalProperties produced1, GlobalProperties produced2)
 	{
 		if (requested1.getPartitioning().isPartitionedOnKey() && requested2.getPartitioning().isPartitionedOnKey()) {
-			return produced1.getPartitioning() == produced2.getPartitioning() && 
-					(produced1.getCustomPartitioner() == null ? 
-						produced2.getCustomPartitioner() == null :
-						produced1.getCustomPartitioner().equals(produced2.getCustomPartitioner()));
+
+			if(produced1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED &&
+					produced2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) {
+
+				// both are hash partitioned, check that partitioning fields are equivalently chosen
+				return checkEquivalentFieldPositionsInKeyFields(
+						produced1.getPartitioningFields(), produced2.getPartitioningFields());
+
+			}
+			else if(produced1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED &&
+					produced2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) {
+
+				// both are range partitioned, check that partitioning fields are equivalently chosen
+				return checkEquivalentFieldPositionsInKeyFields(
+						produced1.getPartitioningFields(), produced2.getPartitioningFields());
+
+			}
+			else if(produced1.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING &&
+					produced2.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING) {
+
+				// both use a custom partitioner. Check that both keys are exactly as specified and that both the same partitioner
+				return produced1.getPartitioningFields().isExactMatch(this.keys1) &&
+						produced2.getPartitioningFields().isExactMatch(this.keys2) &&
+						produced1.getCustomPartitioner() != null && produced2.getCustomPartitioner() != null &&
+						produced1.getCustomPartitioner().equals(produced2.getCustomPartitioner());
+
+			}
+			else {
+
+				// no other partitioning valid, incl. ANY_PARTITIONING.
+				//   For joins we must ensure that both sides are exactly identically partitioned, ANY is not good enough.
+				return false;
+			}
+
 		} else {
 			return true;
 		}
@@ -151,4 +181,5 @@ public abstract class AbstractJoinDescriptor extends OperatorDescriptorDual {
 		gp.clearUniqueFieldCombinations();
 		return gp;
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/88c7ea25/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
index bc83c51..16a393a 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
@@ -99,27 +99,31 @@ public class CoGroupDescriptor extends OperatorDescriptorDual {
 
 	@Override
 	protected List<GlobalPropertiesPair> createPossibleGlobalProperties() {
+
 		if (this.customPartitioner == null) {
+
+			// we accept compatible partitionings of any type
 			RequestedGlobalProperties partitioned_left_any = new RequestedGlobalProperties();
-			RequestedGlobalProperties partitioned_left_hash = new RequestedGlobalProperties();
-			partitioned_left_any.setAnyPartitioning(this.keys1);
-			partitioned_left_hash.setHashPartitioned(this.keys1);
-			
 			RequestedGlobalProperties partitioned_right_any = new RequestedGlobalProperties();
-			RequestedGlobalProperties partitioned_right_hash = new RequestedGlobalProperties();
+			partitioned_left_any.setAnyPartitioning(this.keys1);
 			partitioned_right_any.setAnyPartitioning(this.keys2);
+
+			// add strict hash partitioning of both inputs on their full key sets
+			RequestedGlobalProperties partitioned_left_hash = new RequestedGlobalProperties();
+			RequestedGlobalProperties partitioned_right_hash = new RequestedGlobalProperties();
+			partitioned_left_hash.setHashPartitioned(this.keys1);
 			partitioned_right_hash.setHashPartitioned(this.keys2);
-			
+
 			return Arrays.asList(new GlobalPropertiesPair(partitioned_left_any, partitioned_right_any),
 					new GlobalPropertiesPair(partitioned_left_hash, partitioned_right_hash));
 		}
 		else {
 			RequestedGlobalProperties partitioned_left = new RequestedGlobalProperties();
 			partitioned_left.setCustomPartitioned(this.keys1, this.customPartitioner);
-			
+
 			RequestedGlobalProperties partitioned_right = new RequestedGlobalProperties();
 			partitioned_right.setCustomPartitioned(this.keys2, this.customPartitioner);
-			
+
 			return Collections.singletonList(new GlobalPropertiesPair(partitioned_left, partitioned_right));
 		}
 	}
@@ -135,10 +139,40 @@ public class CoGroupDescriptor extends OperatorDescriptorDual {
 	public boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2,
 			GlobalProperties produced1, GlobalProperties produced2)
 	{
-		return produced1.getPartitioning() == produced2.getPartitioning() && 
-				(produced1.getCustomPartitioner() == null ? 
-					produced2.getCustomPartitioner() == null :
-					produced1.getCustomPartitioner().equals(produced2.getCustomPartitioner()));
+
+		if(produced1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED &&
+				produced2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) {
+
+			// both are hash partitioned, check that partitioning fields are equivalently chosen
+			return checkEquivalentFieldPositionsInKeyFields(
+					produced1.getPartitioningFields(), produced2.getPartitioningFields());
+
+		}
+		else if(produced1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED &&
+				produced2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) {
+
+			// both are range partitioned, check that partitioning fields are equivalently chosen
+			return checkEquivalentFieldPositionsInKeyFields(
+					produced1.getPartitioningFields(), produced2.getPartitioningFields());
+
+		}
+		else if(produced1.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING &&
+				produced2.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING) {
+
+			// both use a custom partitioner. Check that both keys are exactly as specified and that both the same partitioner
+			return produced1.getPartitioningFields().isExactMatch(this.keys1) &&
+					produced2.getPartitioningFields().isExactMatch(this.keys2) &&
+					produced1.getCustomPartitioner() != null && produced2.getCustomPartitioner() != null &&
+					produced1.getCustomPartitioner().equals(produced2.getCustomPartitioner());
+
+		}
+		else {
+
+			// no other partitioning valid, incl. ANY_PARTITIONING.
+			//   For co-groups we must ensure that both sides are exactly identically partitioned, ANY is not good enough.
+			return false;
+		}
+
 	}
 	
 	@Override
@@ -150,12 +184,17 @@ public class CoGroupDescriptor extends OperatorDescriptorDual {
 		Ordering prod1 = produced1.getOrdering();
 		Ordering prod2 = produced2.getOrdering();
 		
-		if (prod1 == null || prod2 == null || prod1.getNumberOfFields() < numRelevantFields ||
-				prod2.getNumberOfFields() < prod2.getNumberOfFields())
-		{
+		if (prod1 == null || prod2 == null) {
 			throw new CompilerException("The given properties do not meet this operators requirements.");
 		}
-			
+
+		// check that order of fields is equivalent
+		if (!checkEquivalentFieldPositionsInKeyFields(
+				prod1.getInvolvedIndexes(), prod2.getInvolvedIndexes(), numRelevantFields)) {
+			return false;
+		}
+
+		// check that order directions are equivalent
 		for (int i = 0; i < numRelevantFields; i++) {
 			if (prod1.getOrder(i) != prod2.getOrder(i)) {
 				return false;
@@ -196,4 +235,5 @@ public class CoGroupDescriptor extends OperatorDescriptorDual {
 		LocalProperties comb = LocalProperties.combine(in1, in2);
 		return comb.clearUniqueFieldSets();
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/88c7ea25/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorDual.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorDual.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorDual.java
index 8eca16e..f72e6b5 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorDual.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/OperatorDescriptorDual.java
@@ -22,6 +22,7 @@ package org.apache.flink.compiler.operators;
 import java.util.List;
 
 import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.dag.TwoInputNode;
 import org.apache.flink.compiler.dataproperties.GlobalProperties;
 import org.apache.flink.compiler.dataproperties.LocalProperties;
@@ -81,7 +82,48 @@ public abstract class OperatorDescriptorDual implements AbstractOperatorDescript
 	public abstract GlobalProperties computeGlobalProperties(GlobalProperties in1, GlobalProperties in2);
 	
 	public abstract LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2);
-	
+
+	protected boolean checkEquivalentFieldPositionsInKeyFields(FieldList fields1, FieldList fields2) {
+
+		// check number of produced partitioning fields
+		if(fields1.size() != fields2.size()) {
+			return false;
+		} else {
+			return checkEquivalentFieldPositionsInKeyFields(fields1, fields2, fields1.size());
+		}
+	}
+
+	protected boolean checkEquivalentFieldPositionsInKeyFields(FieldList fields1, FieldList fields2, int numRelevantFields) {
+
+		// check number of produced partitioning fields
+		if(fields1.size() < numRelevantFields || fields2.size() < numRelevantFields) {
+			return false;
+		}
+		else {
+			for(int i=0; i<numRelevantFields; i++) {
+				int pField1 = fields1.get(i);
+				int pField2 = fields2.get(i);
+				// check if position of both produced fields is the same in both requested fields
+				int j;
+				for(j=0; j<this.keys1.size(); j++) {
+					if(this.keys1.get(j) == pField1 && this.keys2.get(j) == pField2) {
+						break;
+					}
+					else if(this.keys1.get(j) != pField1 && this.keys2.get(j) != pField2) {
+						// do nothing
+					}
+					else {
+						return false;
+					}
+				}
+				if(j == this.keys1.size()) {
+					throw new CompilerException("Fields were not found in key fields.");
+				}
+			}
+		}
+		return true;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	
 	public static final class GlobalPropertiesPair {

http://git-wip-us.apache.org/repos/asf/flink/blob/88c7ea25/flink-compiler/src/main/java/org/apache/flink/compiler/operators/SortMergeJoinDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/SortMergeJoinDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/SortMergeJoinDescriptor.java
index cd6094e..4ca82d5 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/SortMergeJoinDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/SortMergeJoinDescriptor.java
@@ -68,12 +68,17 @@ public class SortMergeJoinDescriptor extends AbstractJoinDescriptor {
 		Ordering prod1 = produced1.getOrdering();
 		Ordering prod2 = produced2.getOrdering();
 		
-		if (prod1 == null || prod2 == null || prod1.getNumberOfFields() < numRelevantFields ||
-				prod2.getNumberOfFields() < prod2.getNumberOfFields())
-		{
+		if (prod1 == null || prod2 == null) {
 			throw new CompilerException("The given properties do not meet this operators requirements.");
 		}
-			
+
+		// check that order of fields is equivalent
+		if (!checkEquivalentFieldPositionsInKeyFields(
+				prod1.getInvolvedIndexes(), prod2.getInvolvedIndexes(), numRelevantFields)) {
+			return false;
+		}
+
+		// check that both inputs have the same directions of order
 		for (int i = 0; i < numRelevantFields; i++) {
 			if (prod1.getOrder(i) != prod2.getOrder(i)) {
 				return false;

http://git-wip-us.apache.org/repos/asf/flink/blob/88c7ea25/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
index e3f5267..677d9be 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
@@ -211,7 +211,7 @@ public class FeedbackPropertiesMatchTest {
 				LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
 				
 				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
-				reqGp.setHashPartitioned(new FieldList(2, 5));
+				reqGp.setHashPartitioned(new FieldSet(2, 5));
 				
 				RequestedLocalProperties reqLp = new RequestedLocalProperties();
 				reqLp.setGroupedFields(new FieldList(1));
@@ -375,7 +375,7 @@ public class FeedbackPropertiesMatchTest {
 				LocalProperties lp = LocalProperties.forGrouping(new FieldList(1, 2));
 				
 				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
-				reqGp.setHashPartitioned(new FieldList(2, 5));
+				reqGp.setHashPartitioned(new FieldSet(2, 5));
 				
 				RequestedLocalProperties reqLp = new RequestedLocalProperties();
 				reqLp.setGroupedFields(new FieldList(1));
@@ -397,7 +397,7 @@ public class FeedbackPropertiesMatchTest {
 				LocalProperties lp = new LocalProperties();
 				
 				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
-				reqGp.setHashPartitioned(new FieldList(2, 5));
+				reqGp.setHashPartitioned(new FieldSet(2, 5));
 				
 				toMap1.setRequiredGlobalProps(null);
 				toMap1.setRequiredLocalProps(null);
@@ -434,7 +434,7 @@ public class FeedbackPropertiesMatchTest {
 				LocalProperties lp = LocalProperties.forGrouping(new FieldList(1));
 				
 				RequestedGlobalProperties reqGp = new RequestedGlobalProperties();
-				reqGp.setAnyPartitioning(new FieldList(2, 5));
+				reqGp.setAnyPartitioning(new FieldSet(2, 5));
 				
 				RequestedLocalProperties reqLp = new RequestedLocalProperties();
 				reqLp.setGroupedFields(new FieldList(1));

http://git-wip-us.apache.org/repos/asf/flink/blob/88c7ea25/flink-compiler/src/test/java/org/apache/flink/compiler/PartitioningReusageTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/PartitioningReusageTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/PartitioningReusageTest.java
new file mode 100644
index 0000000..c4bf560
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/PartitioningReusageTest.java
@@ -0,0 +1,859 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSink;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.compiler.dag.JoinNode;
+import org.apache.flink.compiler.dataproperties.GlobalProperties;
+import org.apache.flink.compiler.dataproperties.LocalProperties;
+import org.apache.flink.compiler.dataproperties.PartitioningProperty;
+import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.compiler.plan.Channel;
+import org.apache.flink.compiler.plan.DualInputPlanNode;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.PlanNode;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Visitor;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@SuppressWarnings("serial")
+public class PartitioningReusageTest extends CompilerTestBase {
+
+	@Test
+	public void noPreviousPartitioningJoin1() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+				.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+					.where(0,1).equalTo(0,1).with(new MockJoin());
+
+		joined.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidJoinInputProperties(join);
+
+	}
+
+	@Test
+	public void noPreviousPartitioningJoin2() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+				.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+				.where(0,1).equalTo(2,1).with(new MockJoin());
+
+		joined.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidJoinInputProperties(join);
+
+	}
+
+	@Test
+	public void reuseSinglePartitioningJoin1() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+				.partitionByHash(0,1)
+				.map(new MockMapper()).withConstantSet("0;1")
+				.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+				.where(0,1).equalTo(0,1).with(new MockJoin());
+
+		joined.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidJoinInputProperties(join);
+
+	}
+
+	@Test
+	public void reuseSinglePartitioningJoin2() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+				.partitionByHash(0,1)
+				.map(new MockMapper()).withConstantSet("0;1")
+				.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+				.where(0,1).equalTo(2,1).with(new MockJoin());
+
+		joined.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidJoinInputProperties(join);
+	}
+
+	@Test
+	public void reuseSinglePartitioningJoin3() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+				.join(set2.partitionByHash(2, 1)
+							.map(new MockMapper())
+							.withConstantSet("2;1"),
+						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+				.where(0,1).equalTo(2,1).with(new MockJoin());
+
+		joined.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidJoinInputProperties(join);
+	}
+
+	@Test
+	public void reuseSinglePartitioningJoin4() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+				.partitionByHash(0)
+				.map(new MockMapper()).withConstantSet("0")
+				.join(set2, JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+				.where(0,1).equalTo(2,1).with(new MockJoin());
+
+		joined.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidJoinInputProperties(join);
+	}
+
+	@Test
+	public void reuseSinglePartitioningJoin5() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+				.join(set2.partitionByHash(2)
+							.map(new MockMapper())
+							.withConstantSet("2"),
+						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+				.where(0,1).equalTo(2,1).with(new MockJoin());
+
+		joined.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidJoinInputProperties(join);
+	}
+
+	@Test
+	public void reuseBothPartitioningJoin1() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+				.partitionByHash(0,1)
+				.map(new MockMapper()).withConstantSet("0;1")
+				.join(set2.partitionByHash(0,1)
+							.map(new MockMapper())
+							.withConstantSet("0;1"),
+						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+				.where(0,1).equalTo(0,1).with(new MockJoin());
+
+		joined.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidJoinInputProperties(join);
+	}
+
+
+	@Test
+	public void reuseBothPartitioningJoin2() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+				.partitionByHash(0,1)
+				.map(new MockMapper()).withConstantSet("0;1")
+				.join(set2.partitionByHash(1,2)
+								.map(new MockMapper())
+								.withConstantSet("1;2"),
+						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+				.where(0,1).equalTo(2,1).with(new MockJoin());
+
+		joined.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidJoinInputProperties(join);
+	}
+
+	@Test
+	public void reuseBothPartitioningJoin3() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+				.partitionByHash(0)
+				.map(new MockMapper()).withConstantSet("0")
+				.join(set2.partitionByHash(2,1)
+								.map(new MockMapper())
+								.withConstantSet("2;1"),
+						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+				.where(0,1).equalTo(2,1).with(new MockJoin());
+
+		joined.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidJoinInputProperties(join);
+	}
+
+	@Test
+	public void reuseBothPartitioningJoin4() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+				.partitionByHash(0,2)
+				.map(new MockMapper()).withConstantSet("0;2")
+				.join(set2.partitionByHash(1)
+								.map(new MockMapper())
+								.withConstantSet("1"),
+						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+				.where(0,2).equalTo(2,1).with(new MockJoin());
+
+		joined.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidJoinInputProperties(join);
+	}
+
+	@Test
+	public void reuseBothPartitioningJoin5() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+				.partitionByHash(2)
+				.map(new MockMapper()).withConstantSet("2")
+				.join(set2.partitionByHash(1)
+								.map(new MockMapper())
+								.withConstantSet("1"),
+						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+				.where(0,2).equalTo(2,1).with(new MockJoin());
+
+		joined.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidJoinInputProperties(join);
+	}
+
+	@Test
+	public void reuseBothPartitioningJoin6() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+				.partitionByHash(0)
+				.map(new MockMapper()).withConstantSet("0")
+				.join(set2.partitionByHash(1)
+								.map(new MockMapper())
+								.withConstantSet("1"),
+						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+				.where(0,2).equalTo(1,2).with(new MockJoin());
+
+		joined.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidJoinInputProperties(join);
+	}
+
+	@Test
+	public void reuseBothPartitioningJoin7() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> joined = set1
+				.partitionByHash(2)
+				.map(new MockMapper()).withConstantSet("2")
+				.join(set2.partitionByHash(1)
+								.map(new MockMapper())
+								.withConstantSet("1"),
+						JoinOperatorBase.JoinHint.REPARTITION_HASH_FIRST)
+				.where(0,2).equalTo(1,2).with(new MockJoin());
+
+		joined.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidJoinInputProperties(join);
+	}
+
+
+	@Test
+	public void noPreviousPartitioningCoGroup1() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+				.coGroup(set2)
+				.where(0,1).equalTo(0,1).with(new MockCoGroup());
+
+		coGrouped.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidCoGroupInputProperties(coGroup);
+
+	}
+
+	@Test
+	public void noPreviousPartitioningCoGroup2() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+				.coGroup(set2)
+				.where(0,1).equalTo(2,1).with(new MockCoGroup());
+
+		coGrouped.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidCoGroupInputProperties(coGroup);
+
+	}
+
+	@Test
+	public void reuseSinglePartitioningCoGroup1() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+				.partitionByHash(0,1)
+				.map(new MockMapper()).withConstantSet("0;1")
+				.coGroup(set2)
+				.where(0,1).equalTo(0,1).with(new MockCoGroup());
+
+		coGrouped.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidCoGroupInputProperties(coGroup);
+
+	}
+
+	@Test
+	public void reuseSinglePartitioningCoGroup2() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+				.partitionByHash(0,1)
+				.map(new MockMapper()).withConstantSet("0;1")
+				.coGroup(set2)
+				.where(0,1).equalTo(2,1).with(new MockCoGroup());
+
+		coGrouped.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidCoGroupInputProperties(coGroup);
+	}
+
+	@Test
+	public void reuseSinglePartitioningCoGroup3() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+				.coGroup(set2.partitionByHash(2, 1)
+								.map(new MockMapper())
+								.withConstantSet("2;1"))
+				.where(0,1).equalTo(2, 1).with(new MockCoGroup());
+
+		coGrouped.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidCoGroupInputProperties(coGroup);
+	}
+
+	@Test
+	public void reuseSinglePartitioningCoGroup4() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+				.partitionByHash(0)
+				.map(new MockMapper()).withConstantSet("0")
+				.coGroup(set2)
+				.where(0, 1).equalTo(2, 1).with(new MockCoGroup());
+
+		coGrouped.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidCoGroupInputProperties(coGroup);
+	}
+
+	@Test
+	public void reuseSinglePartitioningCoGroup5() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+				.coGroup(set2.partitionByHash(2)
+								.map(new MockMapper())
+								.withConstantSet("2"))
+				.where(0,1).equalTo(2,1).with(new MockCoGroup());
+
+		coGrouped.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidCoGroupInputProperties(coGroup);
+	}
+
+	@Test
+	public void reuseBothPartitioningCoGroup1() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+				.partitionByHash(0,1)
+				.map(new MockMapper()).withConstantSet("0;1")
+				.coGroup(set2.partitionByHash(0, 1)
+						.map(new MockMapper())
+						.withConstantSet("0;1"))
+				.where(0, 1).equalTo(0, 1).with(new MockCoGroup());
+
+		coGrouped.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidCoGroupInputProperties(coGroup);
+	}
+
+
+	@Test
+	public void reuseBothPartitioningCoGroup2() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+				.partitionByHash(0,1)
+				.map(new MockMapper()).withConstantSet("0;1")
+				.coGroup(set2.partitionByHash(1, 2)
+						.map(new MockMapper())
+						.withConstantSet("1;2"))
+				.where(0, 1).equalTo(2, 1).with(new MockCoGroup());
+
+		coGrouped.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidCoGroupInputProperties(coGroup);
+	}
+
+	@Test
+	public void reuseBothPartitioningCoGroup3() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+				.partitionByHash(0)
+				.map(new MockMapper()).withConstantSet("0")
+				.coGroup(set2.partitionByHash(2, 1)
+						.map(new MockMapper())
+						.withConstantSet("2;1"))
+				.where(0, 1).equalTo(2, 1).with(new MockCoGroup());
+
+		coGrouped.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidCoGroupInputProperties(coGroup);
+	}
+
+	@Test
+	public void reuseBothPartitioningCoGroup4() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+				.partitionByHash(0,2)
+				.map(new MockMapper()).withConstantSet("0;2")
+				.coGroup(set2.partitionByHash(1)
+						.map(new MockMapper())
+						.withConstantSet("1"))
+				.where(0, 2).equalTo(2, 1).with(new MockCoGroup());
+
+		coGrouped.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidCoGroupInputProperties(coGroup);
+	}
+
+	@Test
+	public void reuseBothPartitioningCoGroup5() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+				.partitionByHash(2)
+				.map(new MockMapper()).withConstantSet("2")
+				.coGroup(set2.partitionByHash(1)
+						.map(new MockMapper())
+						.withConstantSet("1"))
+				.where(0, 2).equalTo(2, 1).with(new MockCoGroup());
+
+		coGrouped.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidCoGroupInputProperties(coGroup);
+	}
+
+	@Test
+	public void reuseBothPartitioningCoGroup6() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+				.partitionByHash(2)
+				.map(new MockMapper()).withConstantSet("2")
+				.coGroup(set2.partitionByHash(2)
+						.map(new MockMapper())
+						.withConstantSet("2"))
+				.where(0, 2).equalTo(1, 2).with(new MockCoGroup());
+
+		coGrouped.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidCoGroupInputProperties(coGroup);
+	}
+
+	@Test
+	public void reuseBothPartitioningCoGroup7() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = set1
+				.partitionByHash(2)
+				.map(new MockMapper()).withConstantSet("2")
+				.coGroup(set2.partitionByHash(1)
+						.map(new MockMapper())
+						.withConstantSet("1"))
+				.where(0, 2).equalTo(1, 2).with(new MockCoGroup());
+
+		coGrouped.print();
+		JavaPlan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+
+		checkValidCoGroupInputProperties(coGroup);
+	}
+
+
+
+	private void checkValidJoinInputProperties(DualInputPlanNode join) {
+
+		GlobalProperties inProps1 = join.getInput1().getGlobalProperties();
+		GlobalProperties inProps2 = join.getInput2().getGlobalProperties();
+
+		if(inProps1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED &&
+				inProps2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) {
+
+			// check that both inputs are hash partitioned on the same fields
+			FieldList pFields1 = inProps1.getPartitioningFields();
+			FieldList pFields2 = inProps2.getPartitioningFields();
+
+			assertTrue("Inputs are not the same number of fields. Input 1: "+pFields1+", Input 2: "+pFields2,
+					pFields1.size() == pFields2.size());
+
+			FieldList reqPFields1 = join.getKeysForInput1();
+			FieldList reqPFields2 = join.getKeysForInput2();
+
+			for(int i=0; i<pFields1.size(); i++) {
+
+				// get fields
+				int f1 = pFields1.get(i);
+				int f2 = pFields2.get(i);
+
+				// check that field positions in original key field list are identical
+				int pos1 = getPosInFieldList(f1, reqPFields1);
+				int pos2 = getPosInFieldList(f2, reqPFields2);
+
+				if(pos1 < 0) {
+					fail("Input 1 is partitioned on field "+f1+" which is not contained in the key set "+reqPFields1);
+				}
+				if(pos2 < 0) {
+					fail("Input 2 is partitioned on field "+f2+" which is not contained in the key set "+reqPFields2);
+				}
+				if(pos1 != pos2) {
+					fail("Inputs are not partitioned on the same key fields");
+				}
+			}
+
+		}
+		else if(inProps1.getPartitioning() == PartitioningProperty.FULL_REPLICATION &&
+				inProps2.getPartitioning() == PartitioningProperty.RANDOM) {
+			// we are good. No need to check for fields
+		}
+		else if(inProps1.getPartitioning() == PartitioningProperty.RANDOM &&
+				inProps2.getPartitioning() == PartitioningProperty.FULL_REPLICATION) {
+			// we are good. No need to check for fields
+		}
+		else {
+			throw new UnsupportedOperationException("This method has only been implemented to check for hash partitioned coGroupinputs");
+		}
+
+	}
+
+	private void checkValidCoGroupInputProperties(DualInputPlanNode coGroup) {
+
+		GlobalProperties inProps1 = coGroup.getInput1().getGlobalProperties();
+		GlobalProperties inProps2 = coGroup.getInput2().getGlobalProperties();
+
+		if(inProps1.getPartitioning() == PartitioningProperty.HASH_PARTITIONED &&
+				inProps2.getPartitioning() == PartitioningProperty.HASH_PARTITIONED) {
+
+			// check that both inputs are hash partitioned on the same fields
+			FieldList pFields1 = inProps1.getPartitioningFields();
+			FieldList pFields2 = inProps2.getPartitioningFields();
+
+			assertTrue("Inputs are not the same number of fields. Input 1: "+pFields1+", Input 2: "+pFields2,
+					pFields1.size() == pFields2.size());
+
+			FieldList reqPFields1 = coGroup.getKeysForInput1();
+			FieldList reqPFields2 = coGroup.getKeysForInput2();
+
+			for(int i=0; i<pFields1.size(); i++) {
+
+				// get fields
+				int f1 = pFields1.get(i);
+				int f2 = pFields2.get(i);
+
+				// check that field positions in original key field list are identical
+				int pos1 = getPosInFieldList(f1, reqPFields1);
+				int pos2 = getPosInFieldList(f2, reqPFields2);
+
+				if(pos1 < 0) {
+					fail("Input 1 is partitioned on field "+f1+" which is not contained in the key set "+reqPFields1);
+				}
+				if(pos2 < 0) {
+					fail("Input 2 is partitioned on field "+f2+" which is not contained in the key set "+reqPFields2);
+				}
+				if(pos1 != pos2) {
+					fail("Inputs are not partitioned on the same key fields");
+				}
+			}
+
+		}
+		else {
+			throw new UnsupportedOperationException("This method has only been implemented to check for hash partitioned coGroup inputs");
+		}
+
+	}
+
+	private int getPosInFieldList(int field, FieldList list) {
+
+		int pos;
+		for(pos=0; pos<list.size(); pos++) {
+			if(field == list.get(pos)) {
+				break;
+			}
+		}
+		if(pos == list.size()) {
+			return -1;
+		} else {
+			return pos;
+		}
+
+	}
+
+
+
+	public static class MockMapper implements MapFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> {
+		@Override
+		public Tuple3<Integer, Integer, Integer> map(Tuple3<Integer, Integer, Integer> value) throws Exception {
+			return null;
+		}
+	}
+
+	public static class MockJoin implements JoinFunction<Tuple3<Integer, Integer, Integer>,
+			Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> {
+
+		@Override
+		public Tuple3<Integer, Integer, Integer> join(Tuple3<Integer, Integer, Integer> first, Tuple3<Integer, Integer, Integer> second) throws Exception {
+			return null;
+		}
+	}
+
+	public static class MockCoGroup implements CoGroupFunction<Tuple3<Integer, Integer, Integer>,
+				Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> {
+
+		@Override
+		public void coGroup(Iterable<Tuple3<Integer, Integer, Integer>> first, Iterable<Tuple3<Integer, Integer, Integer>> second,
+							Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
+
+		}
+	}
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/88c7ea25/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesMatchingTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesMatchingTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesMatchingTest.java
index fd4ad82..e810c67 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesMatchingTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/dataproperties/GlobalPropertiesMatchingTest.java
@@ -41,30 +41,34 @@ public class GlobalPropertiesMatchingTest {
 				GlobalProperties gp1 = new GlobalProperties();
 				gp1.setAnyPartitioning(new FieldList(2, 6));
 				assertTrue(req.isMetBy(gp1));
-				
+
 				GlobalProperties gp2 = new GlobalProperties();
 				gp2.setAnyPartitioning(new FieldList(6, 2));
 				assertTrue(req.isMetBy(gp2));
-				
+
 				GlobalProperties gp3 = new GlobalProperties();
-				gp3.setAnyPartitioning(new FieldList(6, 1));
+				gp3.setAnyPartitioning(new FieldList(6, 2, 4));
 				assertFalse(req.isMetBy(gp3));
-				
+
 				GlobalProperties gp4 = new GlobalProperties();
-				gp4.setAnyPartitioning(new FieldList(2));
-				assertTrue(req.isMetBy(gp4));
+				gp4.setAnyPartitioning(new FieldList(6, 1));
+				assertFalse(req.isMetBy(gp4));
+
+				GlobalProperties gp5 = new GlobalProperties();
+				gp5.setAnyPartitioning(new FieldList(2));
+				assertTrue(req.isMetBy(gp5));
 			}
-			
+
 			// match hash partitioning
 			{
 				GlobalProperties gp1 = new GlobalProperties();
 				gp1.setHashPartitioned(new FieldList(2, 6));
 				assertTrue(req.isMetBy(gp1));
-				
+
 				GlobalProperties gp2 = new GlobalProperties();
 				gp2.setHashPartitioned(new FieldList(6, 2));
 				assertTrue(req.isMetBy(gp2));
-				
+
 				GlobalProperties gp3 = new GlobalProperties();
 				gp3.setHashPartitioned(new FieldList(6, 1));
 				assertFalse(req.isMetBy(gp3));
@@ -153,6 +157,136 @@ public class GlobalPropertiesMatchingTest {
 			fail(e.getMessage());
 		}
 	}
+
+	@Test
+	public void testStrictlyMatchingAnyPartitioning() {
+
+		RequestedGlobalProperties req = new RequestedGlobalProperties();
+		req.setAnyPartitioning(new FieldList(6, 2));
+
+		// match any partitioning
+		{
+			GlobalProperties gp1 = new GlobalProperties();
+			gp1.setAnyPartitioning(new FieldList(6, 2));
+			assertTrue(req.isMetBy(gp1));
+
+			GlobalProperties gp2 = new GlobalProperties();
+			gp2.setAnyPartitioning(new FieldList(2, 6));
+			assertFalse(req.isMetBy(gp2));
+
+			GlobalProperties gp3 = new GlobalProperties();
+			gp3.setAnyPartitioning(new FieldList(6, 2, 3));
+			assertFalse(req.isMetBy(gp3));
+
+			GlobalProperties gp4 = new GlobalProperties();
+			gp3.setAnyPartitioning(new FieldList(6, 1));
+			assertFalse(req.isMetBy(gp3));
+
+			GlobalProperties gp5 = new GlobalProperties();
+			gp4.setAnyPartitioning(new FieldList(2));
+			assertFalse(req.isMetBy(gp4));
+		}
+
+		// match hash partitioning
+		{
+			GlobalProperties gp1 = new GlobalProperties();
+			gp1.setHashPartitioned(new FieldList(6, 2));
+			assertTrue(req.isMetBy(gp1));
+
+			GlobalProperties gp2 = new GlobalProperties();
+			gp2.setHashPartitioned(new FieldList(2, 6));
+			assertFalse(req.isMetBy(gp2));
+
+			GlobalProperties gp3 = new GlobalProperties();
+			gp3.setHashPartitioned(new FieldList(6, 1));
+			assertFalse(req.isMetBy(gp3));
+		}
+
+		// match range partitioning
+		{
+			GlobalProperties gp1 = new GlobalProperties();
+			gp1.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING));
+			assertTrue(req.isMetBy(gp1));
+
+			GlobalProperties gp2 = new GlobalProperties();
+			gp2.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING));
+			assertFalse(req.isMetBy(gp2));
+
+			GlobalProperties gp3 = new GlobalProperties();
+			gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING));
+			assertFalse(req.isMetBy(gp3));
+
+			GlobalProperties gp4 = new GlobalProperties();
+			gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING));
+			assertFalse(req.isMetBy(gp4));
+		}
+
+	}
+
+	@Test
+	public void testStrictlyMatchingHashPartitioning() {
+
+		RequestedGlobalProperties req = new RequestedGlobalProperties();
+		req.setHashPartitioned(new FieldList(6, 2));
+
+		// match any partitioning
+		{
+			GlobalProperties gp1 = new GlobalProperties();
+			gp1.setAnyPartitioning(new FieldList(6, 2));
+			assertFalse(req.isMetBy(gp1));
+
+			GlobalProperties gp2 = new GlobalProperties();
+			gp2.setAnyPartitioning(new FieldList(2, 6));
+			assertFalse(req.isMetBy(gp2));
+
+			GlobalProperties gp3 = new GlobalProperties();
+			gp3.setAnyPartitioning(new FieldList(6, 1));
+			assertFalse(req.isMetBy(gp3));
+
+			GlobalProperties gp4 = new GlobalProperties();
+			gp4.setAnyPartitioning(new FieldList(2));
+			assertFalse(req.isMetBy(gp4));
+		}
+
+		// match hash partitioning
+		{
+			GlobalProperties gp1 = new GlobalProperties();
+			gp1.setHashPartitioned(new FieldList(6, 2));
+			assertTrue(req.isMetBy(gp1));
+
+			GlobalProperties gp2 = new GlobalProperties();
+			gp2.setHashPartitioned(new FieldList(2, 6));
+			assertFalse(req.isMetBy(gp2));
+
+			GlobalProperties gp3 = new GlobalProperties();
+			gp3.setHashPartitioned(new FieldList(6, 1));
+			assertFalse(req.isMetBy(gp3));
+
+			GlobalProperties gp4 = new GlobalProperties();
+			gp4.setHashPartitioned(new FieldList(6, 2, 0));
+			assertFalse(req.isMetBy(gp4));
+		}
+
+		// match range partitioning
+		{
+			GlobalProperties gp1 = new GlobalProperties();
+			gp1.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(2, null, Order.ASCENDING));
+			assertFalse(req.isMetBy(gp1));
+
+			GlobalProperties gp2 = new GlobalProperties();
+			gp2.setRangePartitioned(new Ordering(2, null, Order.DESCENDING).appendOrdering(6, null, Order.ASCENDING));
+			assertFalse(req.isMetBy(gp2));
+
+			GlobalProperties gp3 = new GlobalProperties();
+			gp3.setRangePartitioned(new Ordering(6, null, Order.DESCENDING).appendOrdering(1, null, Order.ASCENDING));
+			assertFalse(req.isMetBy(gp3));
+
+			GlobalProperties gp4 = new GlobalProperties();
+			gp4.setRangePartitioned(new Ordering(6, null, Order.DESCENDING));
+			assertFalse(req.isMetBy(gp4));
+		}
+
+	}
 	
 	// --------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/88c7ea25/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java
index 1a76d49..ae0a722 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java
@@ -117,7 +117,7 @@ public class FieldList extends FieldSet {
 	public FieldList toFieldList() {
 		return this;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	
 	@Override
@@ -158,6 +158,19 @@ public class FieldList extends FieldSet {
 		}
 		return true;
 	}
+
+	public boolean isExactMatch(FieldList list) {
+		if (this.size() != list.size()) {
+			return false;
+		} else {
+			for (int i = 0; i < this.size(); i++) {
+				if (this.get(i) != list.get(i)) {
+					return false;
+				}
+			}
+			return true;
+		}
+	}
 	
 	// --------------------------------------------------------------------------------------------
 


Mime
View raw message