flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [2/2] flink git commit: [Refactor] [DataSet] Refactor key selector translation in DataSet API. Clean up several compiler warnings.
Date Tue, 19 Jan 2016 10:57:44 GMT
[Refactor] [DataSet] Refactor key selector translation in DataSet API.
Clean up several compiler warnings.

This closes #1509


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/544abb93
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/544abb93
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/544abb93

Branch: refs/heads/master
Commit: 544abb937120b72dfd7a4e1cbb40c48222e578d4
Parents: 153a678
Author: Fabian Hueske <fhueske@apache.org>
Authored: Wed Jan 13 00:22:39 2016 +0100
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Tue Jan 19 11:56:38 2016 +0100

----------------------------------------------------------------------
 .../api/java/operators/CoGroupOperator.java     | 174 +++++++------------
 .../api/java/operators/DistinctOperator.java    |  73 ++++----
 .../java/operators/GroupCombineOperator.java    | 105 +++++------
 .../api/java/operators/GroupReduceOperator.java | 117 ++++++-------
 .../flink/api/java/operators/JoinOperator.java  | 114 +++++-------
 .../apache/flink/api/java/operators/Keys.java   | 148 +++++++++++++---
 .../api/java/operators/PartitionOperator.java   |  79 ++++-----
 .../api/java/operators/ReduceOperator.java      |  61 +++----
 .../api/java/operators/SortedGrouping.java      |  11 ++
 9 files changed, 422 insertions(+), 460 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/544abb93/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
index 16c2bf6..ca41fc5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
@@ -27,16 +27,13 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
-import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.DataSet;
@@ -46,13 +43,12 @@ import org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.java.operators.Keys.IncompatibleKeysException;
-import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
+import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.java.operators.translation.PlanBothUnwrappingCoGroupOperator;
 import org.apache.flink.api.java.operators.translation.PlanLeftUnwrappingCoGroupOperator;
 import org.apache.flink.api.java.operators.translation.PlanRightUnwrappingCoGroupOperator;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
 /**
@@ -107,7 +103,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 		// sanity check solution set key mismatches
 		if (input1 instanceof SolutionSetPlaceHolder) {
 			if (keys1 instanceof ExpressionKeys) {
-				int[] positions = ((ExpressionKeys<?>) keys1).computeLogicalKeyPositions();
+				int[] positions = keys1.computeLogicalKeyPositions();
 				((SolutionSetPlaceHolder<?>) input1).checkJoinKeyFields(positions);
 			} else {
 				throw new InvalidProgramException("Currently, the solution set may only be CoGrouped with using tuple field positions.");
@@ -115,7 +111,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 		}
 		if (input2 instanceof SolutionSetPlaceHolder) {
 			if (keys2 instanceof ExpressionKeys) {
-				int[] positions = ((ExpressionKeys<?>) keys2).computeLogicalKeyPositions();
+				int[] positions = keys2.computeLogicalKeyPositions();
 				((SolutionSetPlaceHolder<?>) input2).checkJoinKeyFields(positions);
 			} else {
 				throw new InvalidProgramException("Currently, the solution set may only be CoGrouped with using tuple field positions.");
@@ -140,15 +136,15 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 
 		// offset semantic information by extracted key fields
 		if(props != null &&
-					(this.keys1 instanceof Keys.SelectorFunctionKeys ||
-					this.keys2 instanceof Keys.SelectorFunctionKeys)) {
+					(this.keys1 instanceof SelectorFunctionKeys ||
+					this.keys2 instanceof SelectorFunctionKeys)) {
 
 			int numFields1 = this.getInput1Type().getTotalFields();
 			int numFields2 = this.getInput2Type().getTotalFields();
-			int offset1 = (this.keys1 instanceof Keys.SelectorFunctionKeys) ?
-					((Keys.SelectorFunctionKeys<?,?>) this.keys1).getKeyType().getTotalFields() : 0;
-			int offset2 = (this.keys2 instanceof Keys.SelectorFunctionKeys) ?
-					((Keys.SelectorFunctionKeys<?,?>) this.keys2).getKeyType().getTotalFields() : 0;
+			int offset1 = (this.keys1 instanceof SelectorFunctionKeys) ?
+					((SelectorFunctionKeys<?,?>) this.keys1).getKeyType().getTotalFields() : 0;
+			int offset2 = (this.keys2 instanceof SelectorFunctionKeys) ?
+					((SelectorFunctionKeys<?,?>) this.keys2).getKeyType().getTotalFields() : 0;
 
 			props = SemanticPropUtil.addSourceFieldOffsets(props, numFields1, numFields2, offset1, offset2);
 		}
@@ -205,44 +201,44 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 		
 		final org.apache.flink.api.common.operators.base.CoGroupOperatorBase<?, ?, OUT, ?> po;
 
-		if (keys1 instanceof Keys.SelectorFunctionKeys
-				&& keys2 instanceof Keys.SelectorFunctionKeys) {
+		if (keys1 instanceof SelectorFunctionKeys
+				&& keys2 instanceof SelectorFunctionKeys) {
 
 			@SuppressWarnings("unchecked")
-			Keys.SelectorFunctionKeys<I1, ?> selectorKeys1 =
-					(Keys.SelectorFunctionKeys<I1, ?>) keys1;
+			SelectorFunctionKeys<I1, ?> selectorKeys1 =
+					(SelectorFunctionKeys<I1, ?>) keys1;
 			@SuppressWarnings("unchecked")
-			Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 =
-					(Keys.SelectorFunctionKeys<I2, ?>) keys2;
+			SelectorFunctionKeys<I2, ?> selectorKeys2 =
+					(SelectorFunctionKeys<I2, ?>) keys2;
 
 			po = translateSelectorFunctionCoGroup(selectorKeys1, selectorKeys2, function,
-					getInput1Type(), getInput2Type(), getResultType(), name, input1, input2);
+					getResultType(), name, input1, input2);
 
 			po.setParallelism(getParallelism());
 			po.setCustomPartitioner(customPartitioner);
 		}
-		else if (keys2 instanceof Keys.SelectorFunctionKeys) {
+		else if (keys2 instanceof SelectorFunctionKeys) {
 
 			int[] logicalKeyPositions1 = keys1.computeLogicalKeyPositions();
 
 			@SuppressWarnings("unchecked")
-			Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 = (Keys.SelectorFunctionKeys<I2, ?>) keys2;
+			SelectorFunctionKeys<I2, ?> selectorKeys2 = (SelectorFunctionKeys<I2, ?>) keys2;
 
 			po = translateSelectorFunctionCoGroupRight(logicalKeyPositions1, selectorKeys2, function,
-							getInput1Type(), getInput2Type(), getResultType(), name, input1, input2);
+							getInput1Type(), getResultType(), name, input1, input2);
 
 			po.setParallelism(getParallelism());
 			po.setCustomPartitioner(customPartitioner);
 		}
-		else if (keys1 instanceof Keys.SelectorFunctionKeys) {
+		else if (keys1 instanceof SelectorFunctionKeys) {
 
 			@SuppressWarnings("unchecked")
-			Keys.SelectorFunctionKeys<I1, ?> selectorKeys1 = (Keys.SelectorFunctionKeys<I1, ?>) keys1;
+			SelectorFunctionKeys<I1, ?> selectorKeys1 = (SelectorFunctionKeys<I1, ?>) keys1;
 
 			int[] logicalKeyPositions2 = keys2.computeLogicalKeyPositions();
 
 			po = translateSelectorFunctionCoGroupLeft(selectorKeys1, logicalKeyPositions2, function,
-							getInput1Type(), getInput2Type(), getResultType(), name, input1, input2);
+							getInput2Type(), getResultType(), name, input1, input2);
 		}
 		else if ( keys1 instanceof Keys.ExpressionKeys && keys2 instanceof Keys.ExpressionKeys)
 			{
@@ -256,8 +252,8 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 			int[] logicalKeyPositions2 = keys2.computeLogicalKeyPositions();
 			
 			CoGroupOperatorBase<I1, I2, OUT, CoGroupFunction<I1, I2, OUT>> op =
-					new CoGroupOperatorBase<I1, I2, OUT, CoGroupFunction<I1, I2, OUT>>(
-							function, new BinaryOperatorInformation<I1, I2, OUT>(getInput1Type(), getInput2Type(), getResultType()),
+					new CoGroupOperatorBase<>(
+							function, new BinaryOperatorInformation<>(getInput1Type(), getInput2Type(), getResultType()),
 							logicalKeyPositions1, logicalKeyPositions2, name);
 			
 			op.setFirstInput(input1);
@@ -292,44 +288,35 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 
 
 	private static <I1, I2, K, OUT> PlanBothUnwrappingCoGroupOperator<I1, I2, OUT, K> translateSelectorFunctionCoGroup(
-			Keys.SelectorFunctionKeys<I1, ?> rawKeys1, Keys.SelectorFunctionKeys<I2, ?> rawKeys2,
+			SelectorFunctionKeys<I1, ?> rawKeys1, SelectorFunctionKeys<I2, ?> rawKeys2,
 			CoGroupFunction<I1, I2, OUT> function,
-			TypeInformation<I1> inputType1, TypeInformation<I2> inputType2, TypeInformation<OUT> outputType, String name,
+			TypeInformation<OUT> outputType, String name,
 			Operator<I1> input1, Operator<I2> input2)
 	{
 		@SuppressWarnings("unchecked")
-		final Keys.SelectorFunctionKeys<I1, K> keys1 = (Keys.SelectorFunctionKeys<I1, K>) rawKeys1;
+		final SelectorFunctionKeys<I1, K> keys1 = (SelectorFunctionKeys<I1, K>) rawKeys1;
 		@SuppressWarnings("unchecked")
-		final Keys.SelectorFunctionKeys<I2, K> keys2 = (Keys.SelectorFunctionKeys<I2, K>) rawKeys2;
+		final SelectorFunctionKeys<I2, K> keys2 = (SelectorFunctionKeys<I2, K>) rawKeys2;
 
-		final TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = new TupleTypeInfo<Tuple2<K, I1>>(keys1.getKeyType(), inputType1);
-		final TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 = new TupleTypeInfo<Tuple2<K, I2>>(keys2.getKeyType(), inputType2);
+		final TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = SelectorFunctionKeys.createTypeWithKey(keys1);
+		final TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 = SelectorFunctionKeys.createTypeWithKey(keys2);
 
-		final KeyExtractingMapper<I1, K> extractor1 = new KeyExtractingMapper<I1, K>(keys1.getKeyExtractor());
-		final KeyExtractingMapper<I2, K> extractor2 = new KeyExtractingMapper<I2, K>(keys2.getKeyExtractor());
-		
-		final MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>> keyMapper1 =
-				new MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>>(extractor1, new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1), "Key Extractor 1");
-		final MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>> keyMapper2 =
-				new MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>>(extractor2, new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2), "Key Extractor 2");
-		final PlanBothUnwrappingCoGroupOperator<I1, I2, OUT, K> cogroup = new PlanBothUnwrappingCoGroupOperator<I1, I2, OUT, K>(function, keys1, keys2, name, outputType, typeInfoWithKey1, typeInfoWithKey2);
+		final Operator<Tuple2<K, I1>> keyedInput1 = SelectorFunctionKeys.appendKeyExtractor(input1, keys1);
+		final Operator<Tuple2<K, I2>> keyedInput2 = SelectorFunctionKeys.appendKeyExtractor(input2, keys2);
 
-		cogroup.setFirstInput(keyMapper1);
-		cogroup.setSecondInput(keyMapper2);
+		final PlanBothUnwrappingCoGroupOperator<I1, I2, OUT, K> cogroup =
+			new PlanBothUnwrappingCoGroupOperator<>(function, keys1, keys2, name, outputType, typeInfoWithKey1, typeInfoWithKey2);
 
-		keyMapper1.setInput(input1);
-		keyMapper2.setInput(input2);
-		// set parallelism
-		keyMapper1.setParallelism(input1.getParallelism());
-		keyMapper2.setParallelism(input2.getParallelism());
+		cogroup.setFirstInput(keyedInput1);
+		cogroup.setSecondInput(keyedInput2);
 
 		return cogroup;
 	}
 
 	private static <I1, I2, K, OUT> PlanRightUnwrappingCoGroupOperator<I1, I2, OUT, K> translateSelectorFunctionCoGroupRight(
-			int[] logicalKeyPositions1, Keys.SelectorFunctionKeys<I2, ?> rawKeys2,
+			int[] logicalKeyPositions1, SelectorFunctionKeys<I2, ?> rawKeys2,
 			CoGroupFunction<I1, I2, OUT> function,
-			TypeInformation<I1> inputType1, TypeInformation<I2> inputType2, TypeInformation<OUT> outputType, String name,
+			TypeInformation<I1> inputType1, TypeInformation<OUT> outputType, String name,
 			Operator<I1> input1, Operator<I2> input2)
 	{
 		if(!inputType1.isTupleType()) {
@@ -337,22 +324,12 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 		}
 
 		@SuppressWarnings("unchecked")
-		final Keys.SelectorFunctionKeys<I2, K> keys2 = (Keys.SelectorFunctionKeys<I2, K>) rawKeys2;
-
-		final TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 =
-				new TupleTypeInfo<Tuple2<K, I2>>(keys2.getKeyType(), inputType2);
-
-		final KeyExtractingMapper<I2, K> extractor2 =
-				new KeyExtractingMapper<I2, K>(keys2.getKeyExtractor());
-
-		final MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>> keyMapper2 =
-				new MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>>(
-						extractor2,
-						new UnaryOperatorInformation<I2, Tuple2<K, I2>>(inputType2, typeInfoWithKey2),
-						"Key Extractor 2");
+		final SelectorFunctionKeys<I2, K> keys2 = (SelectorFunctionKeys<I2, K>) rawKeys2;
+		final TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 = SelectorFunctionKeys.createTypeWithKey(keys2);
+		final Operator<Tuple2<K, I2>> keyedInput2 = SelectorFunctionKeys.appendKeyExtractor(input2, keys2);
 		
 		final PlanRightUnwrappingCoGroupOperator<I1, I2, OUT, K> cogroup =
-				new PlanRightUnwrappingCoGroupOperator<I1, I2, OUT, K>(
+				new PlanRightUnwrappingCoGroupOperator<>(
 						function,
 						logicalKeyPositions1,
 						keys2,
@@ -362,19 +339,15 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 						typeInfoWithKey2);
 
 		cogroup.setFirstInput(input1);
-		cogroup.setSecondInput(keyMapper2);
-
-		keyMapper2.setInput(input2);
-		// set parallelism
-		keyMapper2.setParallelism(input2.getParallelism());
+		cogroup.setSecondInput(keyedInput2);
 
 		return cogroup;
 	}
 
 	private static <I1, I2, K, OUT> PlanLeftUnwrappingCoGroupOperator<I1, I2, OUT, K> translateSelectorFunctionCoGroupLeft(
-			Keys.SelectorFunctionKeys<I1, ?> rawKeys1, int[] logicalKeyPositions2,
+			SelectorFunctionKeys<I1, ?> rawKeys1, int[] logicalKeyPositions2,
 			CoGroupFunction<I1, I2, OUT> function,
-			TypeInformation<I1> inputType1, TypeInformation<I2> inputType2, TypeInformation<OUT> outputType, String name,
+			TypeInformation<I2> inputType2, TypeInformation<OUT> outputType, String name,
 			Operator<I1> input1, Operator<I2> input2)
 	{
 		if(!inputType2.isTupleType()) {
@@ -382,21 +355,12 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 		}
 
 		@SuppressWarnings("unchecked")
-		final Keys.SelectorFunctionKeys<I1, K> keys1 = (Keys.SelectorFunctionKeys<I1, K>) rawKeys1;
-
-		final TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 =
-				new TupleTypeInfo<Tuple2<K, I1>>(keys1.getKeyType(), inputType1);
-
-		final KeyExtractingMapper<I1, K> extractor1 = new KeyExtractingMapper<I1, K>(keys1.getKeyExtractor());
-
-		final MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>> keyMapper1 =
-				new MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>>(
-						extractor1,
-						new UnaryOperatorInformation<I1, Tuple2<K, I1>>(inputType1, typeInfoWithKey1),
-						"Key Extractor 1");
+		final SelectorFunctionKeys<I1, K> keys1 = (SelectorFunctionKeys<I1, K>) rawKeys1;
+		final TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = SelectorFunctionKeys.createTypeWithKey(keys1);
+		final Operator<Tuple2<K, I1>> keyedInput1 = SelectorFunctionKeys.appendKeyExtractor(input1, keys1);
 
 		final PlanLeftUnwrappingCoGroupOperator<I1, I2, OUT, K> cogroup =
-				new PlanLeftUnwrappingCoGroupOperator<I1, I2, OUT, K>(
+				new PlanLeftUnwrappingCoGroupOperator<>(
 						function,
 						keys1,
 						logicalKeyPositions2,
@@ -405,13 +369,9 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 						typeInfoWithKey1,
 						inputType2);
 
-		cogroup.setFirstInput(keyMapper1);
+		cogroup.setFirstInput(keyedInput1);
 		cogroup.setSecondInput(input2);
 
-		keyMapper1.setInput(input1);
-		// set parallelism
-		keyMapper1.setParallelism(input1.getParallelism());
-
 		return cogroup;
 	}
 
@@ -455,7 +415,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 		 * @see DataSet
 		 */
 		public CoGroupOperatorSetsPredicate where(int... fields) {
-			return new CoGroupOperatorSetsPredicate(new Keys.ExpressionKeys<I1>(fields, input1.getType()));
+			return new CoGroupOperatorSetsPredicate(new Keys.ExpressionKeys<>(fields, input1.getType()));
 		}
 
 		/**
@@ -472,7 +432,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 		 * @see DataSet
 		 */
 		public CoGroupOperatorSetsPredicate where(String... fields) {
-			return new CoGroupOperatorSetsPredicate(new Keys.ExpressionKeys<I1>(fields, input1.getType()));
+			return new CoGroupOperatorSetsPredicate(new Keys.ExpressionKeys<>(fields, input1.getType()));
 		}
 
 		/**
@@ -489,7 +449,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 		 */
 		public <K> CoGroupOperatorSetsPredicate where(KeySelector<I1, K> keyExtractor) {
 			TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, input1.getType());
-			return new CoGroupOperatorSetsPredicate(new Keys.SelectorFunctionKeys<I1, K>(keyExtractor, input1.getType(), keyType));
+			return new CoGroupOperatorSetsPredicate(new SelectorFunctionKeys<>(keyExtractor, input1.getType(), keyType));
 		}
 
 		// ----------------------------------------------------------------------------------------
@@ -527,7 +487,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 			 *           Call {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(org.apache.flink.api.common.functions.CoGroupFunction)} to finalize the CoGroup transformation.
 			 */
 			public CoGroupOperatorWithoutFunction equalTo(int... fields) {
-				return createCoGroupOperator(new Keys.ExpressionKeys<I2>(fields, input2.getType()));
+				return createCoGroupOperator(new Keys.ExpressionKeys<>(fields, input2.getType()));
 			}
 
 			/**
@@ -540,7 +500,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 			 *           Call {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(org.apache.flink.api.common.functions.CoGroupFunction)} to finalize the CoGroup transformation.
 			 */
 			public CoGroupOperatorWithoutFunction equalTo(String... fields) {
-				return createCoGroupOperator(new Keys.ExpressionKeys<I2>(fields, input2.getType()));
+				return createCoGroupOperator(new Keys.ExpressionKeys<>(fields, input2.getType()));
 			}
 
 			/**
@@ -554,7 +514,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 			 */
 			public <K> CoGroupOperatorWithoutFunction equalTo(KeySelector<I2, K> keyExtractor) {
 				TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, input2.getType());
-				return createCoGroupOperator(new Keys.SelectorFunctionKeys<I2, K>(keyExtractor, input2.getType(), keyType));
+				return createCoGroupOperator(new SelectorFunctionKeys<>(keyExtractor, input2.getType(), keyType));
 			}
 
 			/**
@@ -601,8 +561,8 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 
 					this.keys2 = keys2;
 					
-					this.groupSortKeyOrderFirst = new ArrayList<Pair<Integer, Order>>();
-					this.groupSortKeyOrderSecond = new ArrayList<Pair<Integer, Order>>();
+					this.groupSortKeyOrderFirst = new ArrayList<>();
+					this.groupSortKeyOrderSecond = new ArrayList<>();
 				}
 				
 				/**
@@ -650,7 +610,7 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 					TypeInformation<R> returnType = TypeExtractor.getCoGroupReturnTypes(function, input1.getType(), input2.getType(),
 							Utils.getCallLocationName(), true);
 					
-					return new CoGroupOperator<I1, I2, R>(input1, input2, keys1, keys2, input1.clean(function), returnType,
+					return new CoGroupOperator<>(input1, input2, keys1, keys2, input1.clean(function), returnType,
 							groupSortKeyOrderFirst, groupSortKeyOrderSecond,
 							customPartitioner, Utils.getCallLocationName());
 				}
@@ -679,11 +639,11 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 					if (field >= input1.getType().getArity()) {
 						throw new IllegalArgumentException("Order key out of tuple bounds.");
 					}
-					ExpressionKeys<I1> ek = new ExpressionKeys<I1>(new int[]{field}, input1.getType());
+					ExpressionKeys<I1> ek = new ExpressionKeys<>(new int[]{field}, input1.getType());
 					int[] groupOrderKeys = ek.computeLogicalKeyPositions();
 					
 					for (int key : groupOrderKeys) {
-						this.groupSortKeyOrderFirst.add(new ImmutablePair<Integer, Order>(key, order));
+						this.groupSortKeyOrderFirst.add(new ImmutablePair<>(key, order));
 					}
 					
 					return this;
@@ -709,11 +669,11 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 					if (field >= input2.getType().getArity()) {
 						throw new IllegalArgumentException("Order key out of tuple bounds.");
 					}
-					ExpressionKeys<I2> ek = new ExpressionKeys<I2>(new int[]{field}, input2.getType());
+					ExpressionKeys<I2> ek = new ExpressionKeys<>(new int[]{field}, input2.getType());
 					int[] groupOrderKeys = ek.computeLogicalKeyPositions();
 					
 					for (int key : groupOrderKeys) {
-						this.groupSortKeyOrderSecond.add(new ImmutablePair<Integer, Order>(key, order));
+						this.groupSortKeyOrderSecond.add(new ImmutablePair<>(key, order));
 					}
 					
 					return this;
@@ -734,11 +694,11 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 					if (! (input1.getType() instanceof CompositeType)) {
 						throw new InvalidProgramException("Specifying order keys via field positions is only valid for composite data types (pojo / tuple / case class)");
 					}
-					ExpressionKeys<I1> ek = new ExpressionKeys<I1>(new String[]{fieldExpression}, input1.getType());
+					ExpressionKeys<I1> ek = new ExpressionKeys<>(new String[]{fieldExpression}, input1.getType());
 					int[] groupOrderKeys = ek.computeLogicalKeyPositions();
 					
 					for (int key : groupOrderKeys) {
-						this.groupSortKeyOrderFirst.add(new ImmutablePair<Integer, Order>(key, order));
+						this.groupSortKeyOrderFirst.add(new ImmutablePair<>(key, order));
 					}
 					
 					return this;
@@ -759,11 +719,11 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 					if (! (input2.getType() instanceof CompositeType)) {
 						throw new InvalidProgramException("Specifying order keys via field positions is only valid for composite data types (pojo / tuple / case class)");
 					}
-					ExpressionKeys<I2> ek = new ExpressionKeys<I2>(new String[]{fieldExpression}, input2.getType());
+					ExpressionKeys<I2> ek = new ExpressionKeys<>(new String[]{fieldExpression}, input2.getType());
 					int[] groupOrderKeys = ek.computeLogicalKeyPositions();
 					
 					for (int key : groupOrderKeys) {
-						this.groupSortKeyOrderSecond.add(new ImmutablePair<Integer, Order>(key, order));
+						this.groupSortKeyOrderSecond.add(new ImmutablePair<>(key, order));
 					}
 					
 					return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/544abb93/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
index ad2335b..d1d208a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
@@ -19,31 +19,28 @@
 package org.apache.flink.api.java.operators;
 
 import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
-import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
+import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.util.Collector;
 import org.apache.flink.api.java.DataSet;
 
 /**
  * This operator represents the application of a "distinct" function on a data set, and the
  * result data set produced by the function.
- * 
+ *
  * @param <T> The type of the data set made distinct by the operator.
  */
 public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOperator<T>> {
-	
+
 	private final Keys<T> keys;
-	
+
 	private final String distinctLocationName;
 
 	public DistinctOperator(DataSet<T> input, Keys<T> keys, String distinctLocationName) {
@@ -53,7 +50,7 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
 
 		// if keys is null distinction is done on all fields
 		if (keys == null) {
-			keys = new Keys.ExpressionKeys<T>(new String[] {Keys.ExpressionKeys.SELECT_ALL_CHAR }, input.getType());
+			keys = new Keys.ExpressionKeys<>(new String[] {Keys.ExpressionKeys.SELECT_ALL_CHAR }, input.getType());
 		}
 
 		this.keys = keys;
@@ -61,79 +58,71 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
 
 	@Override
 	protected org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<?, T, ?> translateToDataFlow(Operator<T> input) {
-		
-		final RichGroupReduceFunction<T, T> function = new DistinctFunction<T>();
+
+		final RichGroupReduceFunction<T, T> function = new DistinctFunction<>();
 
 		String name = getName() != null ? getName() : "Distinct at " + distinctLocationName;
-		
+
 		if (keys instanceof Keys.ExpressionKeys) {
 
 			int[] logicalKeyPositions = keys.computeLogicalKeyPositions();
-			UnaryOperatorInformation<T, T> operatorInfo = new UnaryOperatorInformation<T, T>(getInputType(), getResultType());
+			UnaryOperatorInformation<T, T> operatorInfo = new UnaryOperatorInformation<>(getInputType(), getResultType());
 			GroupReduceOperatorBase<T, T, GroupReduceFunction<T, T>> po =
 					new GroupReduceOperatorBase<T, T, GroupReduceFunction<T, T>>(function, operatorInfo, logicalKeyPositions, name);
 
 			po.setCombinable(true);
 			po.setInput(input);
 			po.setParallelism(getParallelism());
-			
+
 			// make sure that distinct preserves the partitioning for the fields on which they operate
 			if (getType().isTupleType()) {
 				SingleInputSemanticProperties sProps = new SingleInputSemanticProperties();
-				
+
 				for (int field : keys.computeLogicalKeyPositions()) {
 					sProps.addForwardedField(field, field);
 				}
-				
+
 				po.setSemanticProperties(sProps);
 			}
-			
-			
+
 			return po;
 		}
-		else if (keys instanceof Keys.SelectorFunctionKeys) {
-		
-			@SuppressWarnings("unchecked")
-			Keys.SelectorFunctionKeys<T, ?> selectorKeys = (Keys.SelectorFunctionKeys<T, ?>) keys;
+		else if (keys instanceof SelectorFunctionKeys) {
 
+			@SuppressWarnings("unchecked")
+			SelectorFunctionKeys<T, ?> selectorKeys = (SelectorFunctionKeys<T, ?>) keys;
 
 			PlanUnwrappingReduceGroupOperator<T, T, ?> po = translateSelectorFunctionDistinct(
-							selectorKeys, function, getInputType(), getResultType(), name, input);
-			
+							selectorKeys, function, getResultType(), name, input);
+
 			po.setParallelism(this.getParallelism());
-			
+
 			return po;
 		}
 		else {
 			throw new UnsupportedOperationException("Unrecognized key type.");
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	private static <IN, OUT, K> PlanUnwrappingReduceGroupOperator<IN, OUT, K> translateSelectorFunctionDistinct(
-			Keys.SelectorFunctionKeys<IN, ?> rawKeys, RichGroupReduceFunction<IN, OUT> function,
-			TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input)
+			SelectorFunctionKeys<IN, ?> rawKeys,
+			RichGroupReduceFunction<IN, OUT> function,
+			TypeInformation<OUT> outputType,
+			String name,
+			Operator<IN> input)
 	{
 		@SuppressWarnings("unchecked")
-		final Keys.SelectorFunctionKeys<IN, K> keys = (Keys.SelectorFunctionKeys<IN, K>) rawKeys;
+		final SelectorFunctionKeys<IN, K> keys = (SelectorFunctionKeys<IN, K>) rawKeys;
 		
-		TypeInformation<Tuple2<K, IN>> typeInfoWithKey = new TupleTypeInfo<Tuple2<K, IN>>(keys.getKeyType(), inputType);
+		TypeInformation<Tuple2<K, IN>> typeInfoWithKey = SelectorFunctionKeys.createTypeWithKey(keys);
+		Operator<Tuple2<K, IN>> keyedInput = SelectorFunctionKeys.appendKeyExtractor(input, keys);
 		
-		KeyExtractingMapper<IN, K> extractor = new KeyExtractingMapper<IN, K>(keys.getKeyExtractor());
-
-
 		PlanUnwrappingReduceGroupOperator<IN, OUT, K> reducer =
-				new PlanUnwrappingReduceGroupOperator<IN, OUT, K>(function, keys, name, outputType, typeInfoWithKey, true);
-		
-		MapOperatorBase<IN, Tuple2<K, IN>, MapFunction<IN, Tuple2<K, IN>>> mapper = new MapOperatorBase<IN, Tuple2<K, IN>, MapFunction<IN, Tuple2<K, IN>>>(extractor, new UnaryOperatorInformation<IN, Tuple2<K, IN>>(inputType, typeInfoWithKey), "Key Extractor");
+				new PlanUnwrappingReduceGroupOperator<>(function, keys, name, outputType, typeInfoWithKey, true);
+		reducer.setInput(keyedInput);
 
-		reducer.setInput(mapper);
-		mapper.setInput(input);
-		
-		// set the mapper's parallelism to the input parallelism to make sure it is chained
-		mapper.setParallelism(input.getParallelism());
-		
 		return reducer;
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/544abb93/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
index 30cb0be..6d02eca 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupCombineOperator.java
@@ -19,24 +19,20 @@
 package org.apache.flink.api.java.operators;
 
 import org.apache.flink.api.common.functions.GroupCombineFunction;
-import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
-import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
-import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingGroupCombineOperator;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingSortedGroupCombineOperator;
-import org.apache.flink.api.java.operators.translation.TwoKeyExtractingMapper;
+import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 
 /**
  * This operator behaves like the GroupReduceOperator with Combine but only runs the Combine part which reduces all data
@@ -96,9 +92,9 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 		// offset semantic information by extracted key fields
 		if(props != null &&
 				this.grouper != null &&
-				this.grouper.keys instanceof Keys.SelectorFunctionKeys) {
+				this.grouper.keys instanceof SelectorFunctionKeys) {
 
-			int offset = ((Keys.SelectorFunctionKeys<?,?>) this.grouper.keys).getKeyType().getTotalFields();
+			int offset = ((SelectorFunctionKeys<?,?>) this.grouper.keys).getKeyType().getTotalFields();
 			if(this.grouper instanceof SortedGrouping) {
 				offset += ((SortedGrouping<?>) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
 			}
@@ -121,9 +117,9 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 		// distinguish between grouped reduce and non-grouped reduce
 		if (grouper == null) {
 			// non grouped reduce
-			UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
+			UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<>(getInputType(), getResultType());
 			GroupCombineOperatorBase<IN, OUT, GroupCombineFunction<IN, OUT>> po =
-					new GroupCombineOperatorBase<IN, OUT, GroupCombineFunction<IN, OUT>>(function, operatorInfo, new int[0], name);
+					new GroupCombineOperatorBase<>(function, operatorInfo, new int[0], name);
 
 			po.setInput(input);
 			// the parallelism for a non grouped reduce can only be 1
@@ -131,33 +127,25 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 			return po;
 		}
 
-		if (grouper.getKeys() instanceof Keys.SelectorFunctionKeys) {
+		if (grouper.getKeys() instanceof SelectorFunctionKeys) {
 
 			@SuppressWarnings("unchecked")
-			Keys.SelectorFunctionKeys<IN, ?> selectorKeys = (Keys.SelectorFunctionKeys<IN, ?>) grouper.getKeys();
+			SelectorFunctionKeys<IN, ?> selectorKeys = (SelectorFunctionKeys<IN, ?>) grouper.getKeys();
 
 			if (grouper instanceof SortedGrouping) {
-				SortedGrouping<IN> sortedGrouper = (SortedGrouping<IN>) grouper;
-				Keys.SelectorFunctionKeys<IN, ?> sortKeys = sortedGrouper.getSortSelectionFunctionKey();
 
-				PlanUnwrappingSortedGroupCombineOperator<IN, OUT, ?, ?> po = translateSelectorFunctionSortedReducer(
-						selectorKeys, sortKeys, function, getInputType(), getResultType(), name, input);
+				SortedGrouping<IN> sortedGrouping = (SortedGrouping<IN>) grouper;
+				SelectorFunctionKeys<IN, ?> sortKeys = sortedGrouping.getSortSelectionFunctionKey();
+				Ordering groupOrder = sortedGrouping.getGroupOrdering();
 
-				// set group order
-				int[] sortKeyPositions = sortedGrouper.getGroupSortKeyPositions();
-				Order[] sortOrders = sortedGrouper.getGroupSortOrders();
-
-				Ordering o = new Ordering();
-				for(int i=0; i < sortKeyPositions.length; i++) {
-					o.appendOrdering(sortKeyPositions[i], null, sortOrders[i]);
-				}
-				po.setGroupOrder(o);
+				PlanUnwrappingSortedGroupCombineOperator<IN, OUT, ?, ?> po =
+					translateSelectorFunctionSortedReducer(selectorKeys, sortKeys, groupOrder, function, getResultType(), name, input);
 
 				po.setParallelism(this.getParallelism());
 				return po;
 			} else {
 				PlanUnwrappingGroupCombineOperator<IN, OUT, ?> po = translateSelectorFunctionReducer(
-						selectorKeys, function, getInputType(), getResultType(), name, input);
+						selectorKeys, function, getResultType(), name, input);
 
 				po.setParallelism(this.getParallelism());
 				return po;
@@ -166,9 +154,9 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 		else if (grouper.getKeys() instanceof Keys.ExpressionKeys) {
 
 			int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions();
-			UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
+			UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<>(getInputType(), getResultType());
 			GroupCombineOperatorBase<IN, OUT, GroupCombineFunction<IN, OUT>> po =
-					new GroupCombineOperatorBase<IN, OUT, GroupCombineFunction<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name);
+					new GroupCombineOperatorBase<>(function, operatorInfo, logicalKeyPositions, name);
 
 			po.setInput(input);
 			po.setParallelism(getParallelism());
@@ -197,53 +185,46 @@ public class GroupCombineOperator<IN, OUT> extends SingleInputUdfOperator<IN, OU
 
 	// --------------------------------------------------------------------------------------------
 
+	@SuppressWarnings("unchecked")
 	private static <IN, OUT, K> PlanUnwrappingGroupCombineOperator<IN, OUT, K> translateSelectorFunctionReducer(
-			Keys.SelectorFunctionKeys<IN, ?> rawKeys, GroupCombineFunction<IN, OUT> function,
-			TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input)
+			SelectorFunctionKeys<IN, ?> rawKeys,
+			GroupCombineFunction<IN, OUT> function,
+			TypeInformation<OUT> outputType,
+			String name,
+			Operator<IN> input)
 	{
-		@SuppressWarnings("unchecked")
-		final Keys.SelectorFunctionKeys<IN, K> keys = (Keys.SelectorFunctionKeys<IN, K>) rawKeys;
+		final SelectorFunctionKeys<IN, K> keys = (SelectorFunctionKeys<IN, K>) rawKeys;
 
-		TypeInformation<Tuple2<K, IN>> typeInfoWithKey = new TupleTypeInfo<Tuple2<K, IN>>(keys.getKeyType(), inputType);
+		TypeInformation<Tuple2<K, IN>> typeInfoWithKey = SelectorFunctionKeys.createTypeWithKey(keys);
+		Operator<Tuple2<K, IN>> keyedInput = SelectorFunctionKeys.appendKeyExtractor(input, keys);
 
-		KeyExtractingMapper<IN, K> extractor = new KeyExtractingMapper<IN, K>(keys.getKeyExtractor());
-
-		PlanUnwrappingGroupCombineOperator<IN, OUT, K> reducer = new PlanUnwrappingGroupCombineOperator<IN, OUT, K>(function, keys, name, outputType, typeInfoWithKey);
-
-		MapOperatorBase<IN, Tuple2<K, IN>, MapFunction<IN, Tuple2<K, IN>>> mapper = new MapOperatorBase<IN, Tuple2<K, IN>, MapFunction<IN, Tuple2<K, IN>>>(extractor, new UnaryOperatorInformation<IN, Tuple2<K, IN>>(inputType, typeInfoWithKey), "Key Extractor");
-
-		reducer.setInput(mapper);
-		mapper.setInput(input);
-
-		// set the mapper's parallelism to the input parallelism to make sure it is chained
-		mapper.setParallelism(input.getParallelism());
+		PlanUnwrappingGroupCombineOperator<IN, OUT, K> reducer =
+			new PlanUnwrappingGroupCombineOperator<>(function, keys, name, outputType, typeInfoWithKey);
+		reducer.setInput(keyedInput);
 
 		return reducer;
 	}
 
+	@SuppressWarnings("unchecked")
 	private static <IN, OUT, K1, K2> PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> translateSelectorFunctionSortedReducer(
-			Keys.SelectorFunctionKeys<IN, ?> rawGroupingKey, Keys.SelectorFunctionKeys<IN, ?> rawSortingKey, GroupCombineFunction<IN, OUT> function,
-			TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input)
+			SelectorFunctionKeys<IN, ?> rawGroupingKey,
+			SelectorFunctionKeys<IN, ?> rawSortingKeys,
+			Ordering groupOrder,
+			GroupCombineFunction<IN, OUT> function,
+			TypeInformation<OUT> outputType,
+			String name,
+			Operator<IN> input)
 	{
-		@SuppressWarnings("unchecked")
-		final Keys.SelectorFunctionKeys<IN, K1> groupingKey = (Keys.SelectorFunctionKeys<IN, K1>) rawGroupingKey;
-
-		@SuppressWarnings("unchecked")
-		final Keys.SelectorFunctionKeys<IN, K2> sortingKey = (Keys.SelectorFunctionKeys<IN, K2>) rawSortingKey;
-
-		TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey = new TupleTypeInfo<Tuple3<K1, K2, IN>>(groupingKey.getKeyType(), sortingKey.getKeyType(), inputType);
-
-		TwoKeyExtractingMapper<IN, K1, K2> extractor = new TwoKeyExtractingMapper<IN, K1, K2>(groupingKey.getKeyExtractor(), sortingKey.getKeyExtractor());
-
-		PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> reducer = new PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2>(function, groupingKey, sortingKey, name, outputType, typeInfoWithKey);
-
-		MapOperatorBase<IN, Tuple3<K1, K2, IN>, MapFunction<IN, Tuple3<K1, K2, IN>>> mapper = new MapOperatorBase<IN, Tuple3<K1, K2, IN>, MapFunction<IN, Tuple3<K1, K2, IN>>>(extractor, new UnaryOperatorInformation<IN, Tuple3<K1, K2, IN>>(inputType, typeInfoWithKey), "Key Extractor");
+		final SelectorFunctionKeys<IN, K1> groupingKey = (SelectorFunctionKeys<IN, K1>) rawGroupingKey;
+		final SelectorFunctionKeys<IN, K2> sortingKey = (SelectorFunctionKeys<IN, K2>)rawSortingKeys;
+		TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey = SelectorFunctionKeys.createTypeWithKey(groupingKey, sortingKey);
 
-		reducer.setInput(mapper);
-		mapper.setInput(input);
+		Operator<Tuple3<K1, K2, IN>> inputWithKey = SelectorFunctionKeys.appendKeyExtractor(input, groupingKey, sortingKey);
 
-		// set the mapper's parallelism to the input parallelism to make sure it is chained
-		mapper.setParallelism(input.getParallelism());
+		PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> reducer =
+			new PlanUnwrappingSortedGroupCombineOperator<>(function, groupingKey, sortingKey, name, outputType, typeInfoWithKey);
+		reducer.setInput(inputWithKey);
+		reducer.setGroupOrder(groupOrder);
 
 		return reducer;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/544abb93/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
index fcbb888..5225b33 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/GroupReduceOperator.java
@@ -20,24 +20,20 @@ package org.apache.flink.api.java.operators;
 
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
-import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
-import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
+import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingSortedReduceGroupOperator;
-import org.apache.flink.api.java.operators.translation.TwoKeyExtractingMapper;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.DataSet;
 
 /**
@@ -132,9 +128,9 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 		// offset semantic information by extracted key fields
 		if(props != null &&
 				this.grouper != null &&
-				this.grouper.keys instanceof Keys.SelectorFunctionKeys) {
+				this.grouper.keys instanceof SelectorFunctionKeys) {
 
-			int offset = ((Keys.SelectorFunctionKeys<?,?>) this.grouper.keys).getKeyType().getTotalFields();
+			int offset = ((SelectorFunctionKeys<?,?>) this.grouper.keys).getKeyType().getTotalFields();
 			if(this.grouper instanceof SortedGrouping) {
 				offset += ((SortedGrouping<?>) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
 			}
@@ -156,9 +152,9 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 		// distinguish between grouped reduce and non-grouped reduce
 		if (grouper == null) {
 			// non grouped reduce
-			UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
+			UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<>(getInputType(), getResultType());
 			GroupReduceOperatorBase<IN, OUT, GroupReduceFunction<IN, OUT>> po =
-					new GroupReduceOperatorBase<IN, OUT, GroupReduceFunction<IN, OUT>>(function, operatorInfo, new int[0], name);
+					new GroupReduceOperatorBase<>(function, operatorInfo, new int[0], name);
 			
 			po.setCombinable(combinable);
 			po.setInput(input);
@@ -167,34 +163,27 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 			return po;
 		}
 	
-		if (grouper.getKeys() instanceof Keys.SelectorFunctionKeys) {
+		if (grouper.getKeys() instanceof SelectorFunctionKeys) {
 		
 			@SuppressWarnings("unchecked")
-			Keys.SelectorFunctionKeys<IN, ?> selectorKeys = (Keys.SelectorFunctionKeys<IN, ?>) grouper.getKeys();
+			SelectorFunctionKeys<IN, ?> selectorKeys = (SelectorFunctionKeys<IN, ?>) grouper.getKeys();
 
 			if (grouper instanceof SortedGrouping) {
-				SortedGrouping<IN> sortedGrouper = (SortedGrouping<IN>) grouper;
-				Keys.SelectorFunctionKeys<IN, ?> sortKeys = sortedGrouper.getSortSelectionFunctionKey();
-
-				PlanUnwrappingSortedReduceGroupOperator<IN, OUT, ?, ?> po = translateSelectorFunctionSortedReducer(
-						selectorKeys, sortKeys, function, getInputType(), getResultType(), name, input, isCombinable());
+				SortedGrouping<IN> sortedGrouping = (SortedGrouping<IN>) grouper;
+				SelectorFunctionKeys<IN, ?> sortKeys = sortedGrouping.getSortSelectionFunctionKey();
+				Ordering groupOrder = sortedGrouping.getGroupOrdering();
 
-				// set group order
-				int[] sortKeyPositions = sortedGrouper.getGroupSortKeyPositions();
-				Order[] sortOrders = sortedGrouper.getGroupSortOrders();
-
-				Ordering o = new Ordering();
-				for(int i=0; i < sortKeyPositions.length; i++) {
-					o.appendOrdering(sortKeyPositions[i], null, sortOrders[i]);
-				}
-				po.setGroupOrder(o);
+				PlanUnwrappingSortedReduceGroupOperator<IN, OUT, ?, ?> po =
+					translateSelectorFunctionSortedReducer(
+						selectorKeys, sortKeys, groupOrder, function, getResultType(), name, input, isCombinable()
+					);
 
 				po.setParallelism(this.getParallelism());
 				po.setCustomPartitioner(grouper.getCustomPartitioner());
 				return po;
 			} else {
 				PlanUnwrappingReduceGroupOperator<IN, OUT, ?> po = translateSelectorFunctionReducer(
-							selectorKeys, function, getInputType(), getResultType(), name, input, isCombinable());
+							selectorKeys, function, getResultType(), name, input, isCombinable());
 
 				po.setParallelism(this.getParallelism());
 				po.setCustomPartitioner(grouper.getCustomPartitioner());
@@ -204,9 +193,9 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 		else if (grouper.getKeys() instanceof Keys.ExpressionKeys) {
 
 			int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions();
-			UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType());
+			UnaryOperatorInformation<IN, OUT> operatorInfo = new UnaryOperatorInformation<>(getInputType(), getResultType());
 			GroupReduceOperatorBase<IN, OUT, GroupReduceFunction<IN, OUT>> po =
-					new GroupReduceOperatorBase<IN, OUT, GroupReduceFunction<IN, OUT>>(function, operatorInfo, logicalKeyPositions, name);
+					new GroupReduceOperatorBase<>(function, operatorInfo, logicalKeyPositions, name);
 
 			po.setCombinable(combinable);
 			po.setInput(input);
@@ -216,7 +205,7 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 			// set group order
 			if (grouper instanceof SortedGrouping) {
 				SortedGrouping<IN> sortedGrouper = (SortedGrouping<IN>) grouper;
-								
+
 				int[] sortKeyPositions = sortedGrouper.getGroupSortKeyPositions();
 				Order[] sortOrders = sortedGrouper.getGroupSortOrders();
 				
@@ -236,56 +225,50 @@ public class GroupReduceOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT
 	
 	
 	// --------------------------------------------------------------------------------------------
-	
+
+	@SuppressWarnings("unchecked")
 	private static <IN, OUT, K> PlanUnwrappingReduceGroupOperator<IN, OUT, K> translateSelectorFunctionReducer(
-			Keys.SelectorFunctionKeys<IN, ?> rawKeys, GroupReduceFunction<IN, OUT> function,
-			TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input,
+			SelectorFunctionKeys<IN, ?> rawKeys,
+			GroupReduceFunction<IN, OUT> function,
+			TypeInformation<OUT> outputType,
+			String name,
+			Operator<IN> input,
 			boolean combinable)
 	{
-		@SuppressWarnings("unchecked")
-		final Keys.SelectorFunctionKeys<IN, K> keys = (Keys.SelectorFunctionKeys<IN, K>) rawKeys;
-		
-		TypeInformation<Tuple2<K, IN>> typeInfoWithKey = new TupleTypeInfo<Tuple2<K, IN>>(keys.getKeyType(), inputType);
-		
-		KeyExtractingMapper<IN, K> extractor = new KeyExtractingMapper<IN, K>(keys.getKeyExtractor());
-		
-		PlanUnwrappingReduceGroupOperator<IN, OUT, K> reducer = new PlanUnwrappingReduceGroupOperator<IN, OUT, K>(function, keys, name, outputType, typeInfoWithKey, combinable);
-		
-		MapOperatorBase<IN, Tuple2<K, IN>, MapFunction<IN, Tuple2<K, IN>>> mapper = new MapOperatorBase<IN, Tuple2<K, IN>, MapFunction<IN, Tuple2<K, IN>>>(extractor, new UnaryOperatorInformation<IN, Tuple2<K, IN>>(inputType, typeInfoWithKey), "Key Extractor");
+		SelectorFunctionKeys<IN, K> keys = (SelectorFunctionKeys<IN, K>) rawKeys;
+		TypeInformation<Tuple2<K, IN>> typeInfoWithKey = SelectorFunctionKeys.createTypeWithKey(keys);
+
+		Operator<Tuple2<K, IN>> keyedInput = SelectorFunctionKeys.appendKeyExtractor(input, keys);
+
+		PlanUnwrappingReduceGroupOperator<IN, OUT, K> reducer =
+			new PlanUnwrappingReduceGroupOperator(function, keys, name, outputType, typeInfoWithKey, combinable);
+		reducer.setInput(keyedInput);
 
-		reducer.setInput(mapper);
-		mapper.setInput(input);
-		
-		// set the mapper's parallelism to the input parallelism to make sure it is chained
-		mapper.setParallelism(input.getParallelism());
-		
 		return reducer;
 	}
 
+	@SuppressWarnings("unchecked")
 	private static <IN, OUT, K1, K2> PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> translateSelectorFunctionSortedReducer(
-		Keys.SelectorFunctionKeys<IN, ?> rawGroupingKey, Keys.SelectorFunctionKeys<IN, ?> rawSortingKey, GroupReduceFunction<IN, OUT> function,
-		TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input,
+		SelectorFunctionKeys<IN, ?> rawGroupingKey,
+		SelectorFunctionKeys<IN, ?> rawSortingKey,
+		Ordering groupOrdering,
+		GroupReduceFunction<IN, OUT> function,
+		TypeInformation<OUT> outputType,
+		String name,
+		Operator<IN> input,
 		boolean combinable)
 	{
-		@SuppressWarnings("unchecked")
-		final Keys.SelectorFunctionKeys<IN, K1> groupingKey = (Keys.SelectorFunctionKeys<IN, K1>) rawGroupingKey;
-
-		@SuppressWarnings("unchecked")
-		final Keys.SelectorFunctionKeys<IN, K2> sortingKey = (Keys.SelectorFunctionKeys<IN, K2>) rawSortingKey;
-
-		TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey = new TupleTypeInfo<Tuple3<K1, K2, IN>>(groupingKey.getKeyType(), sortingKey.getKeyType(), inputType);
-
-		TwoKeyExtractingMapper<IN, K1, K2> extractor = new TwoKeyExtractingMapper<IN, K1, K2>(groupingKey.getKeyExtractor(), sortingKey.getKeyExtractor());
-
-		PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> reducer = new PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2>(function, groupingKey, sortingKey, name, outputType, typeInfoWithKey, combinable);
-
-		MapOperatorBase<IN, Tuple3<K1, K2, IN>, MapFunction<IN, Tuple3<K1, K2, IN>>> mapper = new MapOperatorBase<IN, Tuple3<K1, K2, IN>, MapFunction<IN, Tuple3<K1, K2, IN>>>(extractor, new UnaryOperatorInformation<IN, Tuple3<K1, K2, IN>>(inputType, typeInfoWithKey), "Key Extractor");
+		final SelectorFunctionKeys<IN, K1> groupingKey = (SelectorFunctionKeys<IN, K1>) rawGroupingKey;
+		final SelectorFunctionKeys<IN, K2> sortingKey = (SelectorFunctionKeys<IN, K2>) rawSortingKey;
+		TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey = SelectorFunctionKeys.createTypeWithKey(groupingKey,sortingKey);
 
-		reducer.setInput(mapper);
-		mapper.setInput(input);
+		Operator<Tuple3<K1, K2, IN>> inputWithKey = SelectorFunctionKeys.appendKeyExtractor(input, groupingKey, sortingKey);
 
-		// set the mapper's parallelism to the input parallelism to make sure it is chained
-		mapper.setParallelism(input.getParallelism());
+		PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> reducer =
+			new PlanUnwrappingSortedReduceGroupOperator<>(
+				function, groupingKey, sortingKey, name, outputType, typeInfoWithKey, combinable);
+		reducer.setInput(inputWithKey);
+		reducer.setGroupOrder(groupOrdering);
 
 		return reducer;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/544abb93/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index fac6b46..0c0b710 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -25,17 +25,14 @@ import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.RichFlatJoinFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase;
 import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
-import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
@@ -47,10 +44,10 @@ import org.apache.flink.api.java.functions.SemanticPropUtil;
 import org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder;
 import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.java.operators.Keys.IncompatibleKeysException;
+import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.java.operators.join.JoinOperatorSetsBase;
 import org.apache.flink.api.java.operators.join.JoinType;
 import org.apache.flink.api.java.operators.join.JoinFunctionAssigner;
-import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
 import org.apache.flink.api.java.operators.translation.TupleRightUnwrappingJoiner;
 import org.apache.flink.api.java.operators.translation.TupleLeftUnwrappingJoiner;
 import org.apache.flink.api.java.operators.translation.TupleUnwrappingJoiner;
@@ -104,7 +101,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		// sanity check solution set key mismatches
 		if (input1 instanceof SolutionSetPlaceHolder) {
 			if (keys1 instanceof ExpressionKeys) {
-				int[] positions = ((ExpressionKeys<?>) keys1).computeLogicalKeyPositions();
+				int[] positions = keys1.computeLogicalKeyPositions();
 				((SolutionSetPlaceHolder<?>) input1).checkJoinKeyFields(positions);
 			} else {
 				throw new InvalidProgramException("Currently, the solution set may only be joined with using tuple field positions.");
@@ -112,7 +109,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 		if (input2 instanceof SolutionSetPlaceHolder) {
 			if (keys2 instanceof ExpressionKeys) {
-				int[] positions = ((ExpressionKeys<?>) keys2).computeLogicalKeyPositions();
+				int[] positions = keys2.computeLogicalKeyPositions();
 				((SolutionSetPlaceHolder<?>) input2).checkJoinKeyFields(positions);
 			} else {
 				throw new InvalidProgramException("Currently, the solution set may only be joined with using tuple field positions.");
@@ -260,15 +257,15 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 
 			// offset semantic information by extracted key fields
 			if(props != null &&
-					(this.keys1 instanceof Keys.SelectorFunctionKeys ||
-							this.keys2 instanceof Keys.SelectorFunctionKeys)) {
+					(this.keys1 instanceof SelectorFunctionKeys ||
+							this.keys2 instanceof SelectorFunctionKeys)) {
 
 				int numFields1 = this.getInput1Type().getTotalFields();
 				int numFields2 = this.getInput2Type().getTotalFields();
-				int offset1 = (this.keys1 instanceof Keys.SelectorFunctionKeys) ?
-						((Keys.SelectorFunctionKeys<?,?>) this.keys1).getKeyType().getTotalFields() : 0;
-				int offset2 = (this.keys2 instanceof Keys.SelectorFunctionKeys) ?
-						((Keys.SelectorFunctionKeys<?,?>) this.keys2).getKeyType().getTotalFields() : 0;
+				int offset1 = (this.keys1 instanceof SelectorFunctionKeys) ?
+						((SelectorFunctionKeys<?,?>) this.keys1).getKeyType().getTotalFields() : 0;
+				int offset2 = (this.keys2 instanceof SelectorFunctionKeys) ?
+						((SelectorFunctionKeys<?,?>) this.keys2).getKeyType().getTotalFields() : 0;
 
 				props = SemanticPropUtil.addSourceFieldOffsets(props, numFields1, numFields2, offset1, offset2);
 			}
@@ -315,40 +312,40 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 					.withJoinHint(getJoinHint())
 					.withResultType(getResultType());
 
-			final boolean requiresTupleUnwrapping = keys1 instanceof Keys.SelectorFunctionKeys || keys2 instanceof Keys.SelectorFunctionKeys;
+			final boolean requiresTupleUnwrapping = keys1 instanceof SelectorFunctionKeys || keys2 instanceof SelectorFunctionKeys;
 			if (requiresTupleUnwrapping) {
-				if (keys1 instanceof Keys.SelectorFunctionKeys && keys2 instanceof Keys.SelectorFunctionKeys) {
+				if (keys1 instanceof SelectorFunctionKeys && keys2 instanceof SelectorFunctionKeys) {
 					// Both join sides have a key selector function, so we need to do the
 					// tuple wrapping/unwrapping on both sides.
 
 					@SuppressWarnings("unchecked")
-					Keys.SelectorFunctionKeys<I1, ?> selectorKeys1 = (Keys.SelectorFunctionKeys<I1, ?>) keys1;
+					SelectorFunctionKeys<I1, ?> selectorKeys1 = (SelectorFunctionKeys<I1, ?>) keys1;
 					@SuppressWarnings("unchecked")
-					Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 = (Keys.SelectorFunctionKeys<I2, ?>) keys2;
+					SelectorFunctionKeys<I2, ?> selectorKeys2 = (SelectorFunctionKeys<I2, ?>) keys2;
 
 					builder = builder
 							.withUdf(new TupleUnwrappingJoiner<>(function))
-							.withWrappedInput1(input1, selectorKeys1, getInput1Type())
-							.withWrappedInput2(input2, selectorKeys2, getInput2Type());
-				} else if (keys2 instanceof Keys.SelectorFunctionKeys) {
+							.withWrappedInput1(input1, selectorKeys1)
+							.withWrappedInput2(input2, selectorKeys2);
+				} else if (keys2 instanceof SelectorFunctionKeys) {
 					// The right side of the join needs the tuple wrapping/unwrapping
 
 					@SuppressWarnings("unchecked")
-					Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 = (Keys.SelectorFunctionKeys<I2, ?>) keys2;
+					SelectorFunctionKeys<I2, ?> selectorKeys2 = (SelectorFunctionKeys<I2, ?>) keys2;
 
 					builder = builder
 							.withUdf(new TupleRightUnwrappingJoiner<>(function))
 							.withInput1(input1, getInput1Type(), keys1)
-							.withWrappedInput2(input2, selectorKeys2, getInput2Type());
+							.withWrappedInput2(input2, selectorKeys2);
 				} else {
 					// The left side of the join needs the tuple wrapping/unwrapping
 
 					@SuppressWarnings("unchecked")
-					Keys.SelectorFunctionKeys<I1, ?> selectorKeys1 = (Keys.SelectorFunctionKeys<I1, ?>) keys1;
+					SelectorFunctionKeys<I1, ?> selectorKeys1 = (SelectorFunctionKeys<I1, ?>) keys1;
 
 					builder = builder
 							.withUdf(new TupleLeftUnwrappingJoiner<>(function))
-							.withWrappedInput1(input1, selectorKeys1, getInput1Type())
+							.withWrappedInput1(input1, selectorKeys1)
 							.withInput2(input2, getInput2Type(), keys2);
 				}
 			} else if (keys1 instanceof Keys.ExpressionKeys && keys2 instanceof Keys.ExpressionKeys) {
@@ -393,24 +390,24 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 
 			public <I1, K> JoinOperatorBaseBuilder<OUT> withWrappedInput1(
 					Operator<I1> input1,
-					Keys.SelectorFunctionKeys<I1, ?> rawKeys1,
-					TypeInformation<I1> inputType1) {
-				TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = new TupleTypeInfo<>(rawKeys1.getKeyType(), inputType1);
+					SelectorFunctionKeys<I1, ?> rawKeys1) {
 
-				MapOperatorBase<I1, Tuple2<K, I1>, MapFunction<I1, Tuple2<K, I1>>> keyMapper1 =
-						createKeyMapper(rawKeys1, inputType1, input1, "Key Extractor 1");
+				@SuppressWarnings("unchecked")
+				SelectorFunctionKeys<I1, K> keys1 = (SelectorFunctionKeys<I1, K>)rawKeys1;
+				TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = SelectorFunctionKeys.createTypeWithKey(keys1);
+				Operator<Tuple2<K, I1>> keyMapper1 = SelectorFunctionKeys.appendKeyExtractor(input1, keys1);
 
 				return this.withInput1(keyMapper1, typeInfoWithKey1, rawKeys1);
 			}
 
 			public <I2, K> JoinOperatorBaseBuilder<OUT> withWrappedInput2(
 					Operator<I2> input2,
-					Keys.SelectorFunctionKeys<I2, ?> rawKeys2,
-					TypeInformation<I2> inputType2) {
-				TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 = new TupleTypeInfo<>(rawKeys2.getKeyType(), inputType2);
+					SelectorFunctionKeys<I2, ?> rawKeys2) {
 
-				MapOperatorBase<I2, Tuple2<K, I2>, MapFunction<I2, Tuple2<K, I2>>> keyMapper2 =
-						createKeyMapper(rawKeys2, inputType2, input2, "Key Extractor 2");
+				@SuppressWarnings("unchecked")
+				SelectorFunctionKeys<I2, K> keys2 = (SelectorFunctionKeys<I2, K>)rawKeys2;
+				TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 = SelectorFunctionKeys.createTypeWithKey(keys2);
+				Operator<Tuple2<K, I2>> keyMapper2 = SelectorFunctionKeys.appendKeyExtractor(input2, keys2);
 
 				return withInput2(keyMapper2, typeInfoWithKey2, rawKeys2);
 			}
@@ -500,27 +497,6 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 						throw new UnsupportedOperationException();
 				}
 			}
-
-			private static <I, K> MapOperatorBase<I, Tuple2<K, I>, MapFunction<I, Tuple2<K, I>>> createKeyMapper(
-					Keys.SelectorFunctionKeys<I, ?> rawKeys,
-					TypeInformation<I> inputType,
-					Operator<I> input,
-					String mapperName) {
-
-				@SuppressWarnings("unchecked")
-				final Keys.SelectorFunctionKeys<I, K> keys = (Keys.SelectorFunctionKeys<I, K>) rawKeys;
-				final TypeInformation<Tuple2<K, I>> typeInfoWithKey = new TupleTypeInfo<>(keys.getKeyType(), inputType);
-				final KeyExtractingMapper<I, K> extractor = new KeyExtractingMapper<>(keys.getKeyExtractor());
-
-				final MapOperatorBase<I, Tuple2<K, I>, MapFunction<I, Tuple2<K, I>>> keyMapper =
-						new MapOperatorBase<I, Tuple2<K, I>, MapFunction<I, Tuple2<K, I>>>(
-								extractor,
-								new UnaryOperatorInformation<>(inputType, typeInfoWithKey),
-								mapperName);
-				keyMapper.setInput(input);
-				keyMapper.setParallelism(input.getParallelism());
-				return keyMapper;
-			}
 		}
 	}
 	
@@ -607,7 +583,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		 * @see org.apache.flink.api.java.operators.JoinOperator.ProjectJoin
 		 */
 		public <OUT extends Tuple> ProjectJoin<I1, I2, OUT> projectFirst(int... firstFieldIndexes) {
-			JoinProjection<I1, I2> joinProjection = new JoinProjection<I1, I2>(getInput1(), getInput2(), getKeys1(), getKeys2(), getJoinHint(), firstFieldIndexes, null);
+			JoinProjection<I1, I2> joinProjection = new JoinProjection<>(getInput1(), getInput2(), getKeys1(), getKeys2(), getJoinHint(), firstFieldIndexes, null);
 
 			return joinProjection.projectTupleX();
 		}
@@ -633,7 +609,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		 * @see org.apache.flink.api.java.operators.JoinOperator.ProjectJoin
 		 */
 		public <OUT extends Tuple> ProjectJoin<I1, I2, OUT> projectSecond(int... secondFieldIndexes) {
-			JoinProjection<I1, I2> joinProjection = new JoinProjection<I1, I2>(getInput1(), getInput2(), getKeys1(), getKeys2(), getJoinHint(), null, secondFieldIndexes);
+			JoinProjection<I1, I2> joinProjection = new JoinProjection<>(getInput1(), getInput2(), getKeys1(), getKeys2(), getJoinHint(), null, secondFieldIndexes);
 			
 			return joinProjection.projectTupleX();
 		}
@@ -901,7 +877,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		@Override
 		public <K> JoinOperatorSetsPredicate where(KeySelector<I1, K> keySelector) {
 			TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
-			return new JoinOperatorSetsPredicate(new Keys.SelectorFunctionKeys<>(keySelector, input1.getType(), keyType));
+			return new JoinOperatorSetsPredicate(new SelectorFunctionKeys<>(keySelector, input1.getType(), keyType));
 		}
 
 
@@ -965,7 +941,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			@Override
 			public <K> DefaultJoin<I1, I2> equalTo(KeySelector<I2, K> keySelector) {
 				TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
-				return createDefaultJoin(new Keys.SelectorFunctionKeys<>(keySelector, input2.getType(), keyType));
+				return createDefaultJoin(new SelectorFunctionKeys<>(keySelector, input2.getType(), keyType));
 			}
 		}
 	}
@@ -980,7 +956,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 	public static final class DefaultFlatJoinFunction<T1, T2> extends RichFlatJoinFunction<T1, T2, Tuple2<T1, T2>> {
 
 		private static final long serialVersionUID = 1L;
-		private final Tuple2<T1, T2> outTuple = new Tuple2<T1, T2>();
+		private final Tuple2<T1, T2> outTuple = new Tuple2<>();
 
 		@Override
 		public void join(T1 first, T2 second, Collector<Tuple2<T1,T2>> out) throws Exception {
@@ -1071,14 +1047,14 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			boolean isSecondTuple;
 			
 			if(ds1.getType() instanceof TupleTypeInfo) {
-				numFieldsDs1 = ((TupleTypeInfo<?>)ds1.getType()).getArity();
+				numFieldsDs1 = ds1.getType().getArity();
 				isFirstTuple = true;
 			} else {
 				numFieldsDs1 = 1;
 				isFirstTuple = false;
 			}
 			if(ds2.getType() instanceof TupleTypeInfo) {
-				numFieldsDs2 = ((TupleTypeInfo<?>)ds2.getType()).getArity();
+				numFieldsDs2 = ds2.getType().getArity();
 				isSecondTuple = true;
 			} else {
 				numFieldsDs2 = 1;
@@ -1162,12 +1138,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		protected JoinProjection<I1, I2> projectFirst(int... firstFieldIndexes) {
 			
 			boolean isFirstTuple;
-			
-			if(ds1.getType() instanceof TupleTypeInfo && firstFieldIndexes.length > 0) {
-				isFirstTuple = true;
-			} else {
-				isFirstTuple = false;
-			}
+
+			isFirstTuple = ds1.getType() instanceof TupleTypeInfo && firstFieldIndexes.length > 0;
 			
 			if(!isFirstTuple && firstFieldIndexes.length != 0) {
 				// field index provided for non-Tuple input
@@ -1226,12 +1198,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		protected JoinProjection<I1, I2> projectSecond(int... secondFieldIndexes) {
 			
 			boolean isSecondTuple;
-			
-			if(ds2.getType() instanceof TupleTypeInfo && secondFieldIndexes.length > 0) {
-				isSecondTuple = true;
-			} else {
-				isSecondTuple = false;
-			}
+
+			isSecondTuple = ds2.getType() instanceof TupleTypeInfo && secondFieldIndexes.length > 0;
 			
 			if(!isSecondTuple && secondFieldIndexes.length != 0) {
 				// field index provided for non-Tuple input

http://git-wip-us.apache.org/repos/asf/flink/blob/544abb93/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
index 47c66f4..95ca300 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
@@ -26,13 +26,23 @@ import java.util.List;
 import com.google.common.base.Joiner;
 
 import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
+import org.apache.flink.api.java.operators.translation.KeyRemovingMapper;
+import org.apache.flink.api.java.operators.translation.TwoKeyExtractingMapper;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.slf4j.Logger;
@@ -69,6 +79,7 @@ public abstract class Keys<T> {
 	public static class SelectorFunctionKeys<T, K> extends Keys<T> {
 
 		private final KeySelector<T, K> keyExtractor;
+		private final TypeInformation<T> inputType;
 		private final TypeInformation<K> keyType;
 		private final int[] logicalKeyFields;
 
@@ -81,6 +92,7 @@ public abstract class Keys<T> {
 			}
 
 			this.keyExtractor = keyExtractor;
+			this.inputType = inputType;
 			this.keyType = keyType;
 
 			if(!keyType.isKeyType()) {
@@ -90,7 +102,7 @@ public abstract class Keys<T> {
 			// we have to handle a special case here:
 			// if the keyType is a composite type, we need to select the full type with all its fields.
 			if(keyType instanceof CompositeType) {
-				ExpressionKeys<K> ek = new ExpressionKeys<K>(new String[] {ExpressionKeys.SELECT_ALL_CHAR}, keyType);
+				ExpressionKeys<K> ek = new ExpressionKeys<>(new String[]{ExpressionKeys.SELECT_ALL_CHAR}, keyType);
 				logicalKeyFields = ek.computeLogicalKeyPositions();
 			} else {
 				logicalKeyFields = new int[] {0};
@@ -101,6 +113,10 @@ public abstract class Keys<T> {
 			return keyType;
 		}
 
+		public TypeInformation<T> getInputType() {
+			return inputType;
+		}
+
 		public KeySelector<T, K> getKeyExtractor() {
 			return keyExtractor;
 		}
@@ -171,11 +187,92 @@ public abstract class Keys<T> {
 			}
 			
 			if (typeInfo != null && !(typeInfo instanceof GenericTypeInfo) && (!keyType.equals(typeInfo))) {
-				throw new InvalidProgramException("The partitioner is imcompatible with the key type. "
+				throw new InvalidProgramException("The partitioner is incompatible with the key type. "
 						+ "Partitioner type: " + typeInfo + " , key type: " + keyType);
 			}
 		}
 
+		@SuppressWarnings("unchecked")
+		public static <T, K> Operator<Tuple2<K, T>> appendKeyExtractor(
+			Operator<T> input,
+			SelectorFunctionKeys<T, K> key)
+		{
+
+			TypeInformation<T> inputType = key.getInputType();
+			TypeInformation<Tuple2<K, T>> typeInfoWithKey = createTypeWithKey(key);
+			KeyExtractingMapper<T, K> extractor = new KeyExtractingMapper(key.getKeyExtractor());
+
+			MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>> mapper =
+				new MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>>(
+					extractor,
+					new UnaryOperatorInformation(inputType, typeInfoWithKey),
+					"Key Extractor"
+				);
+
+			mapper.setInput(input);
+			mapper.setParallelism(input.getParallelism());
+
+			return mapper;
+		}
+
+		@SuppressWarnings("unchecked")
+		public static <T, K1, K2> Operator<Tuple3<K1, K2, T>> appendKeyExtractor(
+			Operator<T> input,
+			SelectorFunctionKeys<T, K1> key1,
+			SelectorFunctionKeys<T, K2> key2)
+		{
+
+			TypeInformation<T> inputType = key1.getInputType();
+			TypeInformation<Tuple3<K1, K2, T>> typeInfoWithKey = createTypeWithKey(key1, key2);
+			TwoKeyExtractingMapper<T, K1, K2> extractor =
+				new TwoKeyExtractingMapper<>(key1.getKeyExtractor(), key2.getKeyExtractor());
+
+			MapOperatorBase<T, Tuple3<K1, K2, T>, MapFunction<T, Tuple3<K1, K2, T>>> mapper =
+				new MapOperatorBase<T, Tuple3<K1, K2, T>, MapFunction<T, Tuple3<K1, K2, T>>>(
+					extractor,
+					new UnaryOperatorInformation<>(inputType, typeInfoWithKey),
+					"Key Extractor"
+				);
+
+			mapper.setInput(input);
+			mapper.setParallelism(input.getParallelism());
+
+			return mapper;
+		}
+
+		public static <T, K> org.apache.flink.api.common.operators.SingleInputOperator<?, T, ?> appendKeyRemover(
+			Operator<Tuple2<K, T>> inputWithKey,
+			SelectorFunctionKeys<T, K> key)
+		{
+
+			TypeInformation<T> inputType = key.getInputType();
+			TypeInformation<Tuple2<K, T>> typeInfoWithKey = createTypeWithKey(key);
+
+			MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>> mapper =
+				new MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>>(
+					new KeyRemovingMapper<T, K>(),
+					new UnaryOperatorInformation<>(typeInfoWithKey, inputType),
+					"Key Remover"
+				);
+			mapper.setInput(inputWithKey);
+			mapper.setParallelism(inputWithKey.getParallelism());
+
+			return mapper;
+		}
+
+		public static <T, K> TypeInformation<Tuple2<K, T>> createTypeWithKey(
+			SelectorFunctionKeys<T, K> key)
+		{
+			return new TupleTypeInfo<>(key.getKeyType(), key.getInputType());
+		}
+
+		public static <T, K1, K2> TypeInformation<Tuple3<K1, K2, T>> createTypeWithKey(
+			SelectorFunctionKeys<T, K1> key1,
+			SelectorFunctionKeys<T, K2> key2)
+		{
+			return new TupleTypeInfo<>(key1.getKeyType(), key2.getKeyType(), key1.getInputType());
+		}
+
 		@Override
 		public String toString() {
 			return "Key function (Type: " + keyType + ")";
@@ -228,35 +325,30 @@ public abstract class Keys<T> {
 			}
 			Preconditions.checkArgument(groupingFields.length > 0, "Grouping fields can not be empty at this point");
 			
-			keyFields = new ArrayList<FlatFieldDescriptor>(type.getTotalFields());
+			keyFields = new ArrayList<>(type.getTotalFields());
 			// for each key, find the field:
-			for(int j = 0; j < groupingFields.length; j++) {
-				int keyPos = groupingFields[j];
-
+			for (int keyPos : groupingFields) {
 				int offset = 0;
-				for(int i = 0; i < type.getArity(); i++) {
+				for (int i = 0; i < type.getArity(); i++) {
 
-					TypeInformation fieldType = ((CompositeType<?>) type).getTypeAt(i);
-					if(i < keyPos) {
+					TypeInformation<?> fieldType = ((CompositeType<?>) type).getTypeAt(i);
+					if (i < keyPos) {
 						// not yet there, increment key offset
 						offset += fieldType.getTotalFields();
-					}
-					else {
+					} else {
 						// arrived at key position
 						if (!fieldType.isKeyType()) {
 							throw new InvalidProgramException("This type (" + fieldType + ") cannot be used as key.");
 						}
-						if(fieldType instanceof CompositeType) {
+						if (fieldType instanceof CompositeType) {
 							// add all nested fields of composite type
-							((CompositeType) fieldType).getFlatFields("*", offset, keyFields);
-						}
-						else if(fieldType instanceof AtomicType) {
+							((CompositeType<?>) fieldType).getFlatFields("*", offset, keyFields);
+						} else if (fieldType instanceof AtomicType) {
 							// add atomic type field
 							keyFields.add(new FlatFieldDescriptor(offset, fieldType));
-						}
-						else {
+						} else {
 							// type should either be composite or atomic
-							throw new InvalidProgramException("Field type is neither CompositeType nor AtomicType: "+fieldType);
+							throw new InvalidProgramException("Field type is neither CompositeType nor AtomicType: " + fieldType);
 						}
 						// go to next key
 						break;
@@ -267,7 +359,7 @@ public abstract class Keys<T> {
 		}
 
 		public static <R> List<R> removeNullElementsFromList(List<R> in) {
-			List<R> elements = new ArrayList<R>();
+			List<R> elements = new ArrayList<>();
 			for(R e: in) {
 				if(e != null) {
 					elements.add(e);
@@ -289,7 +381,7 @@ public abstract class Keys<T> {
 					throw new InvalidProgramException("Field expression for atomic type must be equal to '*' or '_'.");
 				}
 
-				keyFields = new ArrayList<FlatFieldDescriptor>(1);
+				keyFields = new ArrayList<>(1);
 				keyFields.add(new FlatFieldDescriptor(0, type));
 			} else {
 				CompositeType<T> cType = (CompositeType<T>) type;
@@ -299,9 +391,9 @@ public abstract class Keys<T> {
 					LOG.warn("The key expressions contained duplicates. They are now unique");
 				}
 				// extract the keys on their flat position
-				keyFields = new ArrayList<FlatFieldDescriptor>(expressions.length);
-				for (int i = 0; i < expressions.length; i++) {
-					List<FlatFieldDescriptor> keys = cType.getFlatFields(expressions[i]); // use separate list to do a size check
+				keyFields = new ArrayList<>(expressions.length);
+				for (String expression : expressions) {
+					List<FlatFieldDescriptor> keys = cType.getFlatFields(expression); // use separate list to do a size check
 					for (FlatFieldDescriptor key : keys) {
 						TypeInformation<?> keyType = key.getType();
 						if (!keyType.isKeyType()) {
@@ -311,8 +403,8 @@ public abstract class Keys<T> {
 							throw new InvalidProgramException("Field type is neither CompositeType nor AtomicType: " + keyType);
 						}
 					}
-					if(keys.size() == 0) {
-						throw new InvalidProgramException("Unable to extract key from expression '"+expressions[i]+"' on key "+cType);
+					if (keys.size() == 0) {
+						throw new InvalidProgramException("Unable to extract key from expression '" + expression + "' on key " + cType);
 					}
 					keyFields.addAll(keys);
 				}
@@ -351,7 +443,7 @@ public abstract class Keys<T> {
 
 		@Override
 		public int[] computeLogicalKeyPositions() {
-			List<Integer> logicalKeys = new ArrayList<Integer>();
+			List<Integer> logicalKeys = new ArrayList<>();
 			for (FlatFieldDescriptor kd : keyFields) {
 				logicalKeys.add(kd.getPosition());
 			}
@@ -390,7 +482,7 @@ public abstract class Keys<T> {
 	}
 	
 	private static String[] removeDuplicates(String[] in) {
-		List<String> ret = new LinkedList<String>();
+		List<String> ret = new LinkedList<>();
 		for(String el : in) {
 			if(!ret.contains(el)) {
 				ret.add(el);
@@ -406,7 +498,7 @@ public abstract class Keys<T> {
 	// --------------------------------------------------------------------------------------------
 
 
-	private static final int[] rangeCheckFields(int[] fields, int maxAllowedField) {
+	private static int[] rangeCheckFields(int[] fields, int maxAllowedField) {
 
 		// range check and duplicate eliminate
 		int i = 1, k = 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/544abb93/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
index dd9dfb6..c3d46f2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
@@ -20,20 +20,16 @@ package org.apache.flink.api.java.operators;
 
 import com.google.common.base.Preconditions;
 
-import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
-import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
 import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
-import org.apache.flink.api.java.operators.translation.KeyRemovingMapper;
+import org.apache.flink.api.java.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 
 /**
  * This operator represents a partitioning.
@@ -113,34 +109,31 @@ public class PartitionOperator<T> extends SingleInputOperator<T, T, PartitionOpe
 		// distinguish between partition types
 		if (pMethod == PartitionMethod.REBALANCE) {
 			
-			UnaryOperatorInformation<T, T> operatorInfo = new UnaryOperatorInformation<T, T>(getType(), getType());
-			PartitionOperatorBase<T> noop = new PartitionOperatorBase<T>(operatorInfo, pMethod, name);
+			UnaryOperatorInformation<T, T> operatorInfo = new UnaryOperatorInformation<>(getType(), getType());
+			PartitionOperatorBase<T> rebalancedInput = new PartitionOperatorBase<>(operatorInfo, pMethod, name);
+			rebalancedInput.setInput(input);
+			rebalancedInput.setParallelism(getParallelism());
 			
-			noop.setInput(input);
-			noop.setParallelism(getParallelism());
-			
-			return noop;
+			return rebalancedInput;
 		} 
 		else if (pMethod == PartitionMethod.HASH || pMethod == PartitionMethod.CUSTOM || pMethod == PartitionMethod.RANGE) {
 			
 			if (pKeys instanceof Keys.ExpressionKeys) {
 				
 				int[] logicalKeyPositions = pKeys.computeLogicalKeyPositions();
-				UnaryOperatorInformation<T, T> operatorInfo = new UnaryOperatorInformation<T, T>(getType(), getType());
-				PartitionOperatorBase<T> noop = new PartitionOperatorBase<T>(operatorInfo, pMethod, logicalKeyPositions, name);
-				
-				noop.setInput(input);
-				noop.setParallelism(getParallelism());
-				noop.setCustomPartitioner(customPartitioner);
+				UnaryOperatorInformation<T, T> operatorInfo = new UnaryOperatorInformation<>(getType(), getType());
+				PartitionOperatorBase<T> partitionedInput = new PartitionOperatorBase<>(operatorInfo, pMethod, logicalKeyPositions, name);
+				partitionedInput.setInput(input);
+				partitionedInput.setParallelism(getParallelism());
+				partitionedInput.setCustomPartitioner(customPartitioner);
 				
-				return noop;
+				return partitionedInput;
 			}
 			else if (pKeys instanceof Keys.SelectorFunctionKeys) {
 				
 				@SuppressWarnings("unchecked")
 				Keys.SelectorFunctionKeys<T, ?> selectorKeys = (Keys.SelectorFunctionKeys<T, ?>) pKeys;
-				MapOperatorBase<?, T, ?> po = translateSelectorFunctionPartitioner(selectorKeys, pMethod, getType(), name, input, getParallelism(), customPartitioner);
-				return po;
+				return translateSelectorFunctionPartitioner(selectorKeys, pMethod, name, input, getParallelism(), customPartitioner);
 			}
 			else {
 				throw new UnsupportedOperationException("Unrecognized key type.");
@@ -151,34 +144,28 @@ public class PartitionOperator<T> extends SingleInputOperator<T, T, PartitionOpe
 			throw new UnsupportedOperationException("Unsupported partitioning method: " + pMethod.name());
 		}
 	}
-	
-	private static <T, K> MapOperatorBase<Tuple2<K, T>, T, ?> translateSelectorFunctionPartitioner(Keys.SelectorFunctionKeys<T, ?> rawKeys,
-			PartitionMethod pMethod, TypeInformation<T> inputType, String name, Operator<T> input, int partitionDop, Partitioner<?> customPartitioner)
+
+	@SuppressWarnings("unchecked")
+	private static <T, K> org.apache.flink.api.common.operators.SingleInputOperator<?, T, ?> translateSelectorFunctionPartitioner(
+		SelectorFunctionKeys<T, ?> rawKeys,
+		PartitionMethod pMethod,
+		String name,
+		Operator<T> input,
+		int partitionDop,
+		Partitioner<?> customPartitioner)
 	{
-		@SuppressWarnings("unchecked")
-		final Keys.SelectorFunctionKeys<T, K> keys = (Keys.SelectorFunctionKeys<T, K>) rawKeys;
-		
-		TypeInformation<Tuple2<K, T>> typeInfoWithKey = new TupleTypeInfo<Tuple2<K, T>>(keys.getKeyType(), inputType);
-		UnaryOperatorInformation<Tuple2<K, T>, Tuple2<K, T>> operatorInfo = new UnaryOperatorInformation<Tuple2<K, T>, Tuple2<K, T>>(typeInfoWithKey, typeInfoWithKey);
-		
-		KeyExtractingMapper<T, K> extractor = new KeyExtractingMapper<T, K>(keys.getKeyExtractor());
-		
-		MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>> keyExtractingMap = new MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>>(extractor, new UnaryOperatorInformation<T, Tuple2<K, T>>(inputType, typeInfoWithKey), "Key Extractor");
-		PartitionOperatorBase<Tuple2<K, T>> noop = new PartitionOperatorBase<Tuple2<K, T>>(operatorInfo, pMethod, new int[]{0}, name);
-		MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>> keyRemovingMap = new MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>>(new KeyRemovingMapper<T, K>(), new UnaryOperatorInformation<Tuple2<K, T>, T>(typeInfoWithKey, inputType), "Key Extractor");
+		final SelectorFunctionKeys<T, K> keys = (SelectorFunctionKeys<T, K>) rawKeys;
+		TypeInformation<Tuple2<K, T>> typeInfoWithKey = SelectorFunctionKeys.createTypeWithKey(keys);
 
-		keyExtractingMap.setInput(input);
-		noop.setInput(keyExtractingMap);
-		keyRemovingMap.setInput(noop);
-		
-		noop.setCustomPartitioner(customPartitioner);
-		
-		// set parallelism
-		keyExtractingMap.setParallelism(input.getParallelism());
-		noop.setParallelism(partitionDop);
-		keyRemovingMap.setParallelism(partitionDop);
-		
-		return keyRemovingMap;
+		Operator<Tuple2<K, T>> keyedInput = SelectorFunctionKeys.appendKeyExtractor(input, keys);
+
+		PartitionOperatorBase<Tuple2<K, T>> keyedPartitionedInput =
+			new PartitionOperatorBase<>(new UnaryOperatorInformation<>(typeInfoWithKey, typeInfoWithKey), pMethod, new int[]{0}, name);
+		keyedPartitionedInput.setInput(keyedInput);
+		keyedPartitionedInput.setCustomPartitioner(customPartitioner);
+		keyedPartitionedInput.setParallelism(partitionDop);
+
+		return SelectorFunctionKeys.appendKeyRemover(keyedPartitionedInput, keys);
 	}
 
 	


Mime
View raw message