flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dwysakow...@apache.org
Subject [3/6] flink git commit: [FLINK-7181] Activate checkstyle flink-java/operators/*
Date Tue, 25 Jul 2017 08:19:05 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index 86ee2a2..4b5dc6b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.api.java.operators;
 
-import java.util.Arrays;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
@@ -31,9 +29,12 @@ import org.apache.flink.api.common.functions.RichFlatJoinFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException;
+import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase;
 import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.common.operators.base.OuterJoinOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -44,32 +45,53 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSec
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
 import org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder;
-import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
-import org.apache.flink.api.common.operators.Keys.IncompatibleKeysException;
-import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
+import org.apache.flink.api.java.operators.join.JoinFunctionAssigner;
 import org.apache.flink.api.java.operators.join.JoinOperatorSetsBase;
 import org.apache.flink.api.java.operators.join.JoinType;
-import org.apache.flink.api.java.operators.join.JoinFunctionAssigner;
-import org.apache.flink.api.java.operators.translation.TupleRightUnwrappingJoiner;
 import org.apache.flink.api.java.operators.translation.TupleLeftUnwrappingJoiner;
+import org.apache.flink.api.java.operators.translation.TupleRightUnwrappingJoiner;
 import org.apache.flink.api.java.operators.translation.TupleUnwrappingJoiner;
 import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple10;
+import org.apache.flink.api.java.tuple.Tuple11;
+import org.apache.flink.api.java.tuple.Tuple12;
+import org.apache.flink.api.java.tuple.Tuple13;
+import org.apache.flink.api.java.tuple.Tuple14;
+import org.apache.flink.api.java.tuple.Tuple15;
+import org.apache.flink.api.java.tuple.Tuple16;
+import org.apache.flink.api.java.tuple.Tuple17;
+import org.apache.flink.api.java.tuple.Tuple18;
+import org.apache.flink.api.java.tuple.Tuple19;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple20;
+import org.apache.flink.api.java.tuple.Tuple21;
+import org.apache.flink.api.java.tuple.Tuple22;
+import org.apache.flink.api.java.tuple.Tuple23;
+import org.apache.flink.api.java.tuple.Tuple24;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.tuple.Tuple9;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
 
-//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
-import org.apache.flink.api.java.tuple.*;
-//CHECKSTYLE.ON: AvoidStarImport
+import java.util.Arrays;
 
 /**
- * A {@link DataSet} that is the result of a Join transformation. 
- * 
+ * A {@link DataSet} that is the result of a Join transformation.
+ *
  * @param <I1> The type of the first input DataSet of the Join transformation.
  * @param <I2> The type of the second input DataSet of the Join transformation.
  * @param <OUT> The type of the result of the Join transformation.
- * 
+ *
  * @see DataSet
  */
 @Public
@@ -77,22 +99,20 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 
 	protected final Keys<I1> keys1;
 	protected final Keys<I2> keys2;
-	
+
 	private final JoinHint joinHint;
 	protected final JoinType joinType;
 
 	private Partitioner<?> customPartitioner;
-	
-	
+
 	protected JoinOperator(DataSet<I1> input1, DataSet<I2> input2,
 			Keys<I1> keys1, Keys<I2> keys2,
-			TypeInformation<OUT> returnType, JoinHint hint, JoinType type)
-	{
+			TypeInformation<OUT> returnType, JoinHint hint, JoinType type) {
 		super(input1, input2, returnType);
-		
+
 		Preconditions.checkNotNull(keys1);
 		Preconditions.checkNotNull(keys2);
-		
+
 		try {
 			if (!keys1.areCompatible(keys2)) {
 				throw new InvalidProgramException("The types of the key fields do not match.");
@@ -125,18 +145,18 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		this.joinHint = hint == null ? InnerJoinOperatorBase.JoinHint.OPTIMIZER_CHOOSES : hint;
 		this.joinType = type;
 	}
-	
+
 	protected Keys<I1> getKeys1() {
 		return this.keys1;
 	}
-	
+
 	protected Keys<I2> getKeys2() {
 		return this.keys2;
 	}
-	
+
 	/**
 	 * Gets the JoinHint that describes how the join is executed.
-	 * 
+	 *
 	 * @return The JoinHint.
 	 */
 	@Internal
@@ -153,14 +173,14 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 	public JoinType getJoinType() {
 		return this.joinType;
 	}
-	
+
 	/**
 	 * Sets a custom partitioner for this join. The partitioner will be called on the join keys to determine
 	 * the partition a key should be assigned to. The partitioner is evaluated on both join inputs in the
 	 * same way.
-	 * <p>
-	 * NOTE: A custom partitioner can only be used with single-field join keys, not with composite join keys.
-	 * 
+	 *
+	 * <p>NOTE: A custom partitioner can only be used with single-field join keys, not with composite join keys.
+	 *
 	 * @param partitioner The custom partitioner to be used.
 	 * @return This join operator, to allow for function chaining.
 	 */
@@ -172,42 +192,43 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		this.customPartitioner = getInput1().clean(partitioner);
 		return this;
 	}
-	
+
 	/**
 	 * Gets the custom partitioner used by this join, or {@code null}, if none is set.
-	 * 
+	 *
 	 * @return The custom partitioner used by this join;
 	 */
 	@Internal
 	public Partitioner<?> getPartitioner() {
 		return customPartitioner;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	// special join types
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
-	 * A Join transformation that applies a {@link JoinFunction} on each pair of joining elements.<br>
-	 * It also represents the {@link DataSet} that is the result of a Join transformation. 
-	 * 
+	 * A Join transformation that applies a {@link JoinFunction} on each pair of joining elements.
+	 *
+	 * <p>It also represents the {@link DataSet} that is the result of a Join transformation.
+	 *
 	 * @param <I1> The type of the first input DataSet of the Join transformation.
 	 * @param <I2> The type of the second input DataSet of the Join transformation.
 	 * @param <OUT> The type of the result of the Join transformation.
-	 * 
+	 *
 	 * @see org.apache.flink.api.common.functions.RichFlatJoinFunction
 	 * @see DataSet
 	 */
 	@Public
 	public static class EquiJoin<I1, I2, OUT> extends JoinOperator<I1, I2, OUT> {
-		
+
 		private final FlatJoinFunction<I1, I2, OUT> function;
-		
+
 		@SuppressWarnings("unused")
 		private boolean preserve1;
 		@SuppressWarnings("unused")
 		private boolean preserve2;
-		
+
 		private final String joinLocationName;
 
 		public EquiJoin(DataSet<I1> input1, DataSet<I2> input2,
@@ -226,11 +247,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 				Keys<I1> keys1, Keys<I2> keys2, FlatJoinFunction<I1, I2, OUT> function,
 				TypeInformation<OUT> returnType, JoinHint hint, String joinLocationName, JoinType type) {
 			super(input1, input2, keys1, keys2, returnType, hint, type);
-			
+
 			if (function == null) {
 				throw new NullPointerException();
 			}
-			
+
 			this.function = function;
 			this.joinLocationName = joinLocationName;
 
@@ -241,7 +262,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 				Keys<I1> keys1, Keys<I2> keys2, FlatJoinFunction<I1, I2, OUT> generatedFunction, JoinFunction<I1, I2, OUT> function,
 				TypeInformation<OUT> returnType, JoinHint hint, String joinLocationName, JoinType type) {
 			super(input1, input2, keys1, keys2, returnType, hint, type);
-			
+
 			this.joinLocationName = joinLocationName;
 
 			if (function == null) {
@@ -252,7 +273,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 
 			UdfOperatorUtils.analyzeDualInputUdf(this, JoinFunction.class, joinLocationName, function, keys1, keys2);
 		}
-		
+
 		@Override
 		protected FlatJoinFunction<I1, I2, OUT> getFunction() {
 			return function;
@@ -265,16 +286,16 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			DualInputSemanticProperties props = super.getSemanticProperties();
 
 			// offset semantic information by extracted key fields
-			if(props != null &&
+			if (props != null &&
 					(this.keys1 instanceof SelectorFunctionKeys ||
 							this.keys2 instanceof SelectorFunctionKeys)) {
 
 				int numFields1 = this.getInput1Type().getTotalFields();
 				int numFields2 = this.getInput2Type().getTotalFields();
 				int offset1 = (this.keys1 instanceof SelectorFunctionKeys) ?
-						((SelectorFunctionKeys<?,?>) this.keys1).getKeyType().getTotalFields() : 0;
+						((SelectorFunctionKeys<?, ?>) this.keys1).getKeyType().getTotalFields() : 0;
 				int offset2 = (this.keys2 instanceof SelectorFunctionKeys) ?
-						((SelectorFunctionKeys<?,?>) this.keys2).getKeyType().getTotalFields() : 0;
+						((SelectorFunctionKeys<?, ?>) this.keys2).getKeyType().getTotalFields() : 0;
 
 				props = SemanticPropUtil.addSourceFieldOffsets(props, numFields1, numFields2, offset1, offset2);
 			}
@@ -371,7 +392,6 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			return builder.build();
 		}
 
-
 		private static final class JoinOperatorBaseBuilder<OUT> {
 
 			private final String name;
@@ -402,7 +422,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 					SelectorFunctionKeys<I1, ?> rawKeys1) {
 
 				@SuppressWarnings("unchecked")
-				SelectorFunctionKeys<I1, K> keys1 = (SelectorFunctionKeys<I1, K>)rawKeys1;
+				SelectorFunctionKeys<I1, K> keys1 = (SelectorFunctionKeys<I1, K>) rawKeys1;
 				TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = KeyFunctions.createTypeWithKey(keys1);
 				Operator<Tuple2<K, I1>> keyMapper1 = KeyFunctions.appendKeyExtractor(input1, keys1);
 
@@ -414,7 +434,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 					SelectorFunctionKeys<I2, ?> rawKeys2) {
 
 				@SuppressWarnings("unchecked")
-				SelectorFunctionKeys<I2, K> keys2 = (SelectorFunctionKeys<I2, K>)rawKeys2;
+				SelectorFunctionKeys<I2, K> keys2 = (SelectorFunctionKeys<I2, K>) rawKeys2;
 				TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 = KeyFunctions.createTypeWithKey(keys2);
 				Operator<Tuple2<K, I2>> keyMapper2 = KeyFunctions.appendKeyExtractor(input2, keys2);
 
@@ -508,11 +528,12 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			}
 		}
 	}
-	
+
 	/**
-	 * A Join transformation that wraps pairs of joining elements into {@link Tuple2}.<br>
-	 * It also represents the {@link DataSet} that is the result of a Join transformation. 
-	 * 
+	 * A Join transformation that wraps pairs of joining elements into {@link Tuple2}.
+	 *
+	 * <p>It also represents the {@link DataSet} that is the result of a Join transformation.
+	 *
 	 * @param <I1> The type of the first input DataSet of the Join transformation.
 	 * @param <I2> The type of the second input DataSet of the Join transformation.
 	 *
@@ -523,20 +544,20 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 	public static final class DefaultJoin<I1, I2> extends EquiJoin<I1, I2, Tuple2<I1, I2>> implements JoinFunctionAssigner<I1, I2> {
 
 		public DefaultJoin(DataSet<I1> input1, DataSet<I2> input2,
-				Keys<I1> keys1, Keys<I2> keys2, JoinHint hint, String joinLocationName, JoinType type)
-		{
+				Keys<I1> keys1, Keys<I2> keys2, JoinHint hint, String joinLocationName, JoinType type) {
 			super(input1, input2, keys1, keys2,
 				new DefaultFlatJoinFunction<I1, I2>(),
 				new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()), hint, joinLocationName, type);
 		}
 
 		/**
-		 * Finalizes a Join transformation by applying a {@link org.apache.flink.api.common.functions.RichFlatJoinFunction} to each pair of joined elements.<br>
-		 * Each JoinFunction call returns exactly one element. 
-		 * 
+		 * Finalizes a Join transformation by applying a {@link org.apache.flink.api.common.functions.RichFlatJoinFunction} to each pair of joined elements.
+		 *
+		 * <p>Each JoinFunction call returns exactly one element.
+		 *
 		 * @param function The JoinFunction that is called for each pair of joined elements.
 		 * @return An EquiJoin that represents the joined result DataSet
-		 * 
+		 *
 		 * @see org.apache.flink.api.common.functions.RichFlatJoinFunction
 		 * @see org.apache.flink.api.java.operators.JoinOperator.EquiJoin
 		 * @see DataSet
@@ -558,8 +579,15 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			return new EquiJoin<>(getInput1(), getInput2(), getKeys1(), getKeys2(), generatedFunction, function, returnType, getJoinHint(), Utils.getCallLocationName(), joinType);
 		}
 
+		/**
+		 * Wrapper around {@link JoinFunction}.
+		 *
+		 * @param <IN1> type of elements of first collection
+		 * @param <IN2> type of elements of second collection
+		 * @param <OUT> type of elements of resulting elements
+		 */
 		@Internal
-		public static class WrappingFlatJoinFunction<IN1, IN2, OUT> extends WrappingFunction<JoinFunction<IN1,IN2,OUT>> implements FlatJoinFunction<IN1, IN2, OUT> {
+		public static class WrappingFlatJoinFunction<IN1, IN2, OUT> extends WrappingFunction<JoinFunction<IN1, IN2, OUT>> implements FlatJoinFunction<IN1, IN2, OUT> {
 
 			private static final long serialVersionUID = 1L;
 
@@ -574,11 +602,12 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 
 		/**
-		 * Applies a ProjectJoin transformation and projects the first join input<br>
-		 * If the first join input is a {@link Tuple} {@link DataSet}, fields can be selected by their index.
-		 * If the first join input is not a Tuple DataSet, no parameters should be passed.<br>
-		 * 
-		 * Fields of the first and second input can be added by chaining the method calls of
+		 * Applies a ProjectJoin transformation and projects the first join input
+		 *
+		 * <p>If the first join input is a {@link Tuple} {@link DataSet}, fields can be selected by their index.
+		 * If the first join input is not a Tuple DataSet, no parameters should be passed.
+		 *
+		 * <p>Fields of the first and second input can be added by chaining the method calls of
 		 * {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectFirst(int...)} and
 		 * {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectSecond(int...)}.
 		 *
@@ -588,7 +617,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		 * 					   For a non-Tuple DataSet, do not provide parameters.
 		 * 					   The order of fields in the output tuple is defined by to the order of field indexes.
 		 * @return A ProjectJoin which represents the projected join result.
-		 * 
+		 *
 		 * @see Tuple
 		 * @see DataSet
 		 * @see org.apache.flink.api.java.operators.JoinOperator.ProjectJoin
@@ -598,67 +627,69 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 
 			return joinProjection.projectTupleX();
 		}
-		
+
 		/**
-		 * Applies a ProjectJoin transformation and projects the second join input<br>
-		 * If the second join input is a {@link Tuple} {@link DataSet}, fields can be selected by their index.
-		 * If the second join input is not a Tuple DataSet, no parameters should be passed.<br>
-		 * 
-		 * Fields of the first and second input can be added by chaining the method calls of
+		 * Applies a ProjectJoin transformation and projects the second join input
+		 *
+		 * <p>If the second join input is a {@link Tuple} {@link DataSet}, fields can be selected by their index.
+		 * If the second join input is not a Tuple DataSet, no parameters should be passed.
+		 *
+		 * <p>Fields of the first and second input can be added by chaining the method calls of
 		 * {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectFirst(int...)} and
 		 * {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectSecond(int...)}.
 		 *
-		 * <b>Note: With the current implementation, the Project transformation loses type information.</b>
+		 * <p><b>Note: With the current implementation, the Project transformation loses type information.</b>
 		 *
-		 * @param secondFieldIndexes If the second input is a Tuple DataSet, the indexes of the selected fields. 
+		 * @param secondFieldIndexes If the second input is a Tuple DataSet, the indexes of the selected fields.
 		 * 					   For a non-Tuple DataSet, do not provide parameters.
 		 * 					   The order of fields in the output tuple is defined by to the order of field indexes.
 		 * @return A ProjectJoin which represents the projected join result.
-		 * 
+		 *
 		 * @see Tuple
 		 * @see DataSet
 		 * @see org.apache.flink.api.java.operators.JoinOperator.ProjectJoin
 		 */
 		public <OUT extends Tuple> ProjectJoin<I1, I2, OUT> projectSecond(int... secondFieldIndexes) {
 			JoinProjection<I1, I2> joinProjection = new JoinProjection<>(getInput1(), getInput2(), getKeys1(), getKeys2(), getJoinHint(), null, secondFieldIndexes);
-			
+
 			return joinProjection.projectTupleX();
 		}
 
 //		public JoinOperator<I1, I2, I1> leftSemiJoin() {
 //			return new LeftSemiJoin<I1, I2>(getInput1(), getInput2(), getKeys1(), getKeys2(), getJoinHint());
 //		}
-		
+
 //		public JoinOperator<I1, I2, I2> rightSemiJoin() {
 //			return new RightSemiJoin<I1, I2>(getInput1(), getInput2(), getKeys1(), getKeys2(), getJoinHint());
 //		}
-		
+
 //		public JoinOperator<I1, I2, I1> leftAntiJoin() {
 //			return new LeftAntiJoin<I1, I2>(getInput1(), getInput2(), getKeys1(), getKeys2(), getJoinHint());
 //		}
-		
+
 //		public JoinOperator<I1, I2, I2> rightAntiJoin() {
 //			return new RightAntiJoin<I1, I2>(getInput1(), getInput2(), getKeys1(), getKeys2(), getJoinHint());
 //		}
 	}
-	
+
 	/**
-	 * A Join transformation that projects joining elements or fields of joining {@link Tuple Tuples} 
-	 * into result {@link Tuple Tuples}. <br>
-	 * It also represents the {@link DataSet} that is the result of a Join transformation. 
-	 * 
+	 * A Join transformation that projects joining elements or fields of joining {@link Tuple Tuples}
+	 * into result {@link Tuple Tuples}.
+	 *
+	 * <p>It also represents the {@link DataSet} that is the result of a Join transformation.
+	 *
 	 * @param <I1> The type of the first input DataSet of the Join transformation.
 	 * @param <I2> The type of the second input DataSet of the Join transformation.
 	 * @param <OUT> The type of the result of the Join transformation.
-	 * 
+	 *
 	 * @see Tuple
 	 * @see DataSet
 	 */
 	@Public
 	public static class ProjectJoin<I1, I2, OUT extends Tuple> extends EquiJoin<I1, I2, OUT> {
-		
+
 		private JoinProjection<I1, I2> joinProj;
-		
+
 		protected ProjectJoin(DataSet<I1> input1, DataSet<I2> input2, Keys<I1> keys1, Keys<I2> keys2, JoinHint hint, int[] fields, boolean[] isFromFirst, TupleTypeInfo<OUT> returnType) {
 			super(input1, input2, keys1, keys2,
 					new ProjectFlatJoinFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer(input1.getExecutionEnvironment().getConfig()).createInstance()),
@@ -666,7 +697,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 
 			joinProj = null;
 		}
-		
+
 		protected ProjectJoin(DataSet<I1> input1, DataSet<I2> input2, Keys<I1> keys1, Keys<I2> keys2, JoinHint hint, int[] fields, boolean[] isFromFirst, TupleTypeInfo<OUT> returnType, JoinProjection<I1, I2> joinProj) {
 			super(input1, input2, keys1, keys2,
 					new ProjectFlatJoinFunction<I1, I2, OUT>(fields, isFromFirst, returnType.createSerializer(input1.getExecutionEnvironment().getConfig()).createInstance()),
@@ -681,15 +712,16 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 
 		/**
-		 * Continues a ProjectJoin transformation and adds fields of the first join input to the projection.<br>
-		 * If the first join input is a {@link Tuple} {@link DataSet}, fields can be selected by their index.
-		 * If the first join input is not a Tuple DataSet, no parameters should be passed.<br>
+		 * Continues a ProjectJoin transformation and adds fields of the first join input to the projection.
+		 *
+		 * <p>If the first join input is a {@link Tuple} {@link DataSet}, fields can be selected by their index.
+		 * If the first join input is not a Tuple DataSet, no parameters should be passed.
 		 *
-		 * Additional fields of the first and second input can be added by chaining the method calls of
+		 * <p>Additional fields of the first and second input can be added by chaining the method calls of
 		 * {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectFirst(int...)} and
 		 * {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectSecond(int...)}.
 		 *
-		 * <b>Note: With the current implementation, the Project transformation loses type information.</b>
+		 * <p><b>Note: With the current implementation, the Project transformation loses type information.</b>
 		 *
 		 * @param firstFieldIndexes If the first input is a Tuple DataSet, the indexes of the selected fields.
 		 * 					   For a non-Tuple DataSet, do not provide parameters.
@@ -701,22 +733,23 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		 * @see org.apache.flink.api.java.operators.JoinOperator.ProjectJoin
 		 */
 		@SuppressWarnings("hiding")
-		public <OUT extends Tuple> ProjectJoin<I1, I2, OUT> projectFirst(int... firstFieldIndexes) {	
+		public <OUT extends Tuple> ProjectJoin<I1, I2, OUT> projectFirst(int... firstFieldIndexes) {
 			joinProj = joinProj.projectFirst(firstFieldIndexes);
-			
+
 			return joinProj.projectTupleX();
 		}
 
 		/**
-		 * Continues a ProjectJoin transformation and adds fields of the second join input to the projection.<br>
-		 * If the second join input is a {@link Tuple} {@link DataSet}, fields can be selected by their index.
-		 * If the second join input is not a Tuple DataSet, no parameters should be passed.<br>
+		 * Continues a ProjectJoin transformation and adds fields of the second join input to the projection.
 		 *
-		 * Additional fields of the first and second input can be added by chaining the method calls of
+		 * <p>If the second join input is a {@link Tuple} {@link DataSet}, fields can be selected by their index.
+		 * If the second join input is not a Tuple DataSet, no parameters should be passed.
+		 *
+		 * <p>Additional fields of the first and second input can be added by chaining the method calls of
 		 * {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectFirst(int...)} and
 		 * {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectSecond(int...)}.
 		 *
-		 * <b>Note: With the current implementation, the Project transformation loses type information.</b>
+		 * <p><b>Note: With the current implementation, the Project transformation loses type information.</b>
 		 *
 		 * @param secondFieldIndexes If the second input is a Tuple DataSet, the indexes of the selected fields.
 		 * 					   For a non-Tuple DataSet, do not provide parameters.
@@ -730,7 +763,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		@SuppressWarnings("hiding")
 		public <OUT extends Tuple> ProjectJoin<I1, I2, OUT> projectSecond(int... secondFieldIndexes) {
 			joinProj = joinProj.projectSecond(secondFieldIndexes);
-			
+
 			return joinProj.projectTupleX();
 		}
 
@@ -743,15 +776,15 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		@Deprecated
 		@PublicEvolving
 		public <OUT extends Tuple> JoinOperator<I1, I2, OUT> types(Class<?>... types) {
-			TupleTypeInfo<OUT> typeInfo = (TupleTypeInfo<OUT>)this.getResultType();
+			TupleTypeInfo<OUT> typeInfo = (TupleTypeInfo<OUT>) this.getResultType();
 
-			if(types.length != typeInfo.getArity()) {
+			if (types.length != typeInfo.getArity()) {
 				throw new InvalidProgramException("Provided types do not match projection.");
 			}
-			for (int i=0; i<types.length; i++) {
+			for (int i = 0; i < types.length; i++) {
 				Class<?> typeClass = types[i];
 				if (!typeClass.equals(typeInfo.getTypeAt(i).getTypeClass())) {
-					throw new InvalidProgramException("Provided type "+typeClass.getSimpleName()+" at position "+i+" does not match projection");
+					throw new InvalidProgramException("Provided type " + typeClass.getSimpleName() + " at position " + i + " does not match projection");
 				}
 			}
 			return (JoinOperator<I1, I2, OUT>) this;
@@ -766,7 +799,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		public JoinOperator<I1, I2, OUT> withForwardedFieldsSecond(String... forwardedFieldsSecond) {
 			throw new InvalidProgramException("The semantic properties (forwarded fields) are automatically calculated.");
 		}
-		
+
 		@Override
 		protected DualInputSemanticProperties extractSemanticAnnotationsFromUdf(Class<?> udfClass) {
 			// we do not extract the annotation, we construct the properties from the projection#
@@ -775,68 +808,69 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 
 	}
-	
+
 //	@SuppressWarnings("unused")
 //	private static final class LeftAntiJoin<I1, I2> extends JoinOperator<I1, I2, I1> {
-//		
+//
 //		protected LeftAntiJoin(DataSet<I1> input1, DataSet<I2> input2, Keys<I1> keys1, Keys<I2> keys2, JoinHint hint) {
 //			super(input1, input2, keys1, keys2, input1.getType(), hint);
 //		}
-//		
+//
 //		@Override
 //		protected Operator<I1> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
 //			throw new UnsupportedOperationException("LeftAntiJoin operator currently not supported.");
 //		}
 //	}
-	
+
 //	@SuppressWarnings("unused")
 //	private static final class RightAntiJoin<I1, I2> extends JoinOperator<I1, I2, I2> {
-//		
+//
 //		protected RightAntiJoin(DataSet<I1> input1, DataSet<I2> input2, Keys<I1> keys1, Keys<I2> keys2, JoinHint hint) {
 //			super(input1, input2, keys1, keys2, input2.getType(), hint);
 //		}
-//		
+//
 //		@Override
 //		protected Operator<I2> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
 //			throw new UnsupportedOperationException("RightAntiJoin operator currently not supported.");
 //		}
 //	}
-	
+
 //	@SuppressWarnings("unused")
 //	private static final class LeftSemiJoin<I1, I2> extends EquiJoin<I1, I2, I1> {
-//		
+//
 //		protected LeftSemiJoin(DataSet<I1> input1, DataSet<I2> input2, Keys<I1> keys1, Keys<I2> keys2, JoinHint hint) {
 //			super(input1, input2, keys1, keys2, new LeftSemiJoinFunction<I1, I2>(), input1.getType(), hint);
 //		}
-//		
+//
 //		@Override
 //		protected Operator<I1> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
 //			// TODO: Runtime support required. Each left tuple may be returned only once.
-//			// 	     Special exec strategy (runtime + optimizer) based on hash join required. 
+//			// 	     Special exec strategy (runtime + optimizer) based on hash join required.
 //			// 		 Either no duplicates of right side in HT or left tuples removed from HT after first match.
 //			throw new UnsupportedOperationException("LeftSemiJoin operator currently not supported.");
 //		}
 //	}
-	
+
 //	@SuppressWarnings("unused")
 //	private static final class RightSemiJoin<I1, I2> extends EquiJoin<I1, I2, I2> {
-//		
+//
 //		protected RightSemiJoin(DataSet<I1> input1, DataSet<I2> input2, Keys<I1> keys1, Keys<I2> keys2, JoinHint hint) {
 //			super(input1, input2, keys1, keys2, new RightSemiJoinFunction<I1, I2>(), input2.getType(), hint);
 //		}
-//		
+//
 //		@Override
 //		protected Operator<I2> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
 //			// TODO: Runtime support required. Each right tuple may be returned only once.
-//			// 	     Special exec strategy (runtime + optimizer) based on hash join required. 
+//			// 	     Special exec strategy (runtime + optimizer) based on hash join required.
 //			// 		 Either no duplicates of left side in HT or right tuples removed from HT after first match.
 //			throw new UnsupportedOperationException("RightSemiJoin operator currently not supported.");
 //		}
 //	}
 
 	/**
-	 * Intermediate step of a Join transformation. <br>
-	 * To continue the Join transformation, select the join key of the first input {@link DataSet} by calling
+	 * Intermediate step of a Join transformation.
+	 *
+	 * <p>To continue the Join transformation, select the join key of the first input {@link DataSet} by calling
 	 * {@link JoinOperatorSets#where(int...)} or
 	 * {@link JoinOperatorSets#where(org.apache.flink.api.java.functions.KeySelector)}.
 	 *
@@ -894,10 +928,10 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			return new JoinOperatorSetsPredicate(new SelectorFunctionKeys<>(input1.clean(keySelector), input1.getType(), keyType));
 		}
 
-
 		/**
-		 * Intermediate step of a Join transformation. <br>
-		 * To continue the Join transformation, select the join key of the second input {@link DataSet} by calling
+		 * Intermediate step of a Join transformation.
+		 *
+		 * <p>To continue the Join transformation, select the join key of the second input {@link DataSet} by calling
 		 * {@link org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(int...)} or
 		 * {@link org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets.JoinOperatorSetsPredicate#equalTo(KeySelector)}.
 		 */
@@ -910,10 +944,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 
 			/**
 			 * Continues a Join transformation and defines the {@link Tuple} fields of the second join
-			 * {@link DataSet} that should be used as join keys.<br>
-			 * <b>Note: Fields can only be selected as join keys on Tuple DataSets.</b><br>
-			 * <p>
-			 * The resulting {@link DefaultJoin} wraps each pair of joining elements into a {@link Tuple2}, with
+			 * {@link DataSet} that should be used as join keys.
+			 *
+			 * <p><b>Note: Fields can only be selected as join keys on Tuple DataSets.</b>
+			 *
+			 * <p>The resulting {@link DefaultJoin} wraps each pair of joining elements into a {@link Tuple2}, with
 			 * the element of the first input being the first field of the tuple and the element of the
 			 * second input being the second field of the tuple.
 			 *
@@ -927,9 +962,9 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 
 			/**
 			 * Continues a Join transformation and defines the fields of the second join
-			 * {@link DataSet} that should be used as join keys.<br>
-			 * <p>
-			 * The resulting {@link DefaultJoin} wraps each pair of joining elements into a {@link Tuple2}, with
+			 * {@link DataSet} that should be used as join keys.
+			 *
+			 * <p>The resulting {@link DefaultJoin} wraps each pair of joining elements into a {@link Tuple2}, with
 			 * the element of the first input being the first field of the tuple and the element of the
 			 * second input being the second field of the tuple.
 			 *
@@ -942,11 +977,12 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			}
 
 			/**
-			 * Continues a Join transformation and defines a {@link KeySelector} function for the second join {@link DataSet}.<br>
-			 * The KeySelector function is called for each element of the second DataSet and extracts a single
-			 * key value on which the DataSet is joined. <br>
-			 * <p>
-			 * The resulting {@link DefaultJoin} wraps each pair of joining elements into a {@link Tuple2}, with
+			 * Continues a Join transformation and defines a {@link KeySelector} function for the second join {@link DataSet}.
+			 *
+			 * <p>The KeySelector function is called for each element of the second DataSet and extracts a single
+			 * key value on which the DataSet is joined.
+			 *
+			 * <p>The resulting {@link DefaultJoin} wraps each pair of joining elements into a {@link Tuple2}, with
 			 * the element of the first input being the first field of the tuple and the element of the
 			 * second input being the second field of the tuple.
 			 *
@@ -961,7 +997,6 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 	}
 
-	
 	// --------------------------------------------------------------------------------------------
 	//  default join functions
 	// --------------------------------------------------------------------------------------------
@@ -969,13 +1004,13 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 	@ForwardedFieldsFirst("*->0")
 	@ForwardedFieldsSecond("*->1")
 	@Internal
-	public static final class DefaultFlatJoinFunction<T1, T2> extends RichFlatJoinFunction<T1, T2, Tuple2<T1, T2>> {
+	private static final class DefaultFlatJoinFunction<T1, T2> extends RichFlatJoinFunction<T1, T2, Tuple2<T1, T2>> {
 
 		private static final long serialVersionUID = 1L;
 		private final Tuple2<T1, T2> outTuple = new Tuple2<>();
 
 		@Override
-		public void join(T1 first, T2 second, Collector<Tuple2<T1,T2>> out) throws Exception {
+		public void join(T1 first, T2 second, Collector<Tuple2<T1, T2>> out) throws Exception {
 			outTuple.f0 = first;
 			outTuple.f1 = second;
 			out.collect(outTuple);
@@ -983,26 +1018,26 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 	}
 
 	@Internal
-	public static final class ProjectFlatJoinFunction<T1, T2, R extends Tuple> extends RichFlatJoinFunction<T1, T2, R> {
-		
+	private static final class ProjectFlatJoinFunction<T1, T2, R extends Tuple> extends RichFlatJoinFunction<T1, T2, R> {
+
 		private static final long serialVersionUID = 1L;
-		
+
 		private final int[] fields;
 		private final boolean[] isFromFirst;
 		private final R outTuple;
-	
+
 		/**
 		 * Instantiates and configures a ProjectJoinFunction.
 		 * Creates output tuples by copying fields of joined input tuples (or a full input object) into an output tuple.
-		 * 
-		 * @param fields List of indexes fields that should be copied to the output tuple. 
-		 * 					If the full input object should be copied (for example in case of a non-tuple input) the index should be -1. 
+		 *
+		 * @param fields List of indexes fields that should be copied to the output tuple.
+		 * 					If the full input object should be copied (for example in case of a non-tuple input) the index should be -1.
 		 * @param isFromFirst List of flags indicating whether the field should be copied from the first (true) or the second (false) input.
 		 * @param outTupleInstance An instance of an output tuple.
 		 */
 		private ProjectFlatJoinFunction(int[] fields, boolean[] isFromFirst, R outTupleInstance) {
-			if(fields.length != isFromFirst.length) {
-				throw new IllegalArgumentException("Fields and isFromFirst arrays must have same length!"); 
+			if (fields.length != isFromFirst.length) {
+				throw new IllegalArgumentException("Fields and isFromFirst arrays must have same length!");
 			}
 
 			this.fields = fields;
@@ -1036,12 +1071,12 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			}
 			out.collect(outTuple);
 		}
-		
+
 	}
 
 	@Internal
-	public static final class JoinProjection<I1, I2> {
-		
+	private static final class JoinProjection<I1, I2> {
+
 		private final DataSet<I1> ds1;
 		private final DataSet<I2> ds2;
 		private final Keys<I1> keys1;
@@ -1050,10 +1085,10 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 
 		private int[] fieldIndexes;
 		private boolean[] isFieldInFirst;
-		
+
 		private final int numFieldsDs1;
 		private final int numFieldsDs2;
-		
+
 		public JoinProjection(DataSet<I1> ds1, DataSet<I2> ds2, Keys<I1> keys1, Keys<I2> keys2, JoinHint hint, int[] firstFieldIndexes, int[] secondFieldIndexes) {
 			this.ds1 = ds1;
 			this.ds2 = ds2;
@@ -1063,32 +1098,32 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 
 			boolean isFirstTuple;
 			boolean isSecondTuple;
-			
-			if(ds1.getType() instanceof TupleTypeInfo) {
+
+			if (ds1.getType() instanceof TupleTypeInfo) {
 				numFieldsDs1 = ds1.getType().getArity();
 				isFirstTuple = true;
 			} else {
 				numFieldsDs1 = 1;
 				isFirstTuple = false;
 			}
-			if(ds2.getType() instanceof TupleTypeInfo) {
+			if (ds2.getType() instanceof TupleTypeInfo) {
 				numFieldsDs2 = ds2.getType().getArity();
 				isSecondTuple = true;
 			} else {
 				numFieldsDs2 = 1;
 				isSecondTuple = false;
 			}
-			
+
 			boolean isTuple;
 			boolean firstInput;
-			
-			if(firstFieldIndexes != null && secondFieldIndexes == null) {
+
+			if (firstFieldIndexes != null && secondFieldIndexes == null) {
 				// index array for first input is provided
 				firstInput = true;
 				isTuple = isFirstTuple;
 				this.fieldIndexes = firstFieldIndexes;
-				
-				if(this.fieldIndexes.length == 0) {
+
+				if (this.fieldIndexes.length == 0) {
 					// no indexes provided, treat tuple as regular object
 					isTuple = false;
 				}
@@ -1097,8 +1132,8 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 				firstInput = false;
 				isTuple = isSecondTuple;
 				this.fieldIndexes = secondFieldIndexes;
-				
-				if(this.fieldIndexes.length == 0) {
+
+				if (this.fieldIndexes.length == 0) {
 					// no indexes provided, treat tuple as regular object
 					isTuple = false;
 				}
@@ -1107,23 +1142,23 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			} else {
 				throw new IllegalArgumentException("You must provide at most one field index array.");
 			}
-			
-			if(!isTuple && this.fieldIndexes.length != 0) {
+
+			if (!isTuple && this.fieldIndexes.length != 0) {
 				// field index provided for non-Tuple input
 				throw new IllegalArgumentException("Input is not a Tuple. Call projectFirst() (or projectSecond()) without arguments to include it.");
-			} else if(this.fieldIndexes.length > 22) {
+			} else if (this.fieldIndexes.length > 22) {
 				throw new IllegalArgumentException("You may select only up to twenty-two (22) fields.");
 			}
-			
-			if(isTuple) {
+
+			if (isTuple) {
 				this.isFieldInFirst = new boolean[this.fieldIndexes.length];
-				
+
 				// check field indexes and adapt to position in tuple
 				int maxFieldIndex = firstInput ? numFieldsDs1 : numFieldsDs2;
-				for(int i=0; i<this.fieldIndexes.length; i++) {
+				for (int i = 0; i < this.fieldIndexes.length; i++) {
 					Preconditions.checkElementIndex(this.fieldIndexes[i], maxFieldIndex);
 
-					if(firstInput) {
+					if (firstInput) {
 						this.isFieldInFirst[i] = true;
 					} else {
 						this.isFieldInFirst[i] = false;
@@ -1135,48 +1170,49 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			}
 
 		}
-		
+
 		/**
-		 * Continues a ProjectJoin transformation and adds fields of the first join input.<br>
-		 * If the first join input is a {@link Tuple} {@link DataSet}, fields can be selected by their index.
-		 * If the first join input is not a Tuple DataSet, no parameters should be passed.<br>
-		 * 
-		 * Fields of the first and second input can be added by chaining the method calls of
-		 * {@link org.apache.flink.api.java.operators.JoinOperator.JoinProjection#projectFirst(int...)} and 
+		 * Continues a ProjectJoin transformation and adds fields of the first join input.
+		 *
+		 * <p>If the first join input is a {@link Tuple} {@link DataSet}, fields can be selected by their index.
+		 * If the first join input is not a Tuple DataSet, no parameters should be passed.
+		 *
+		 * <p>Fields of the first and second input can be added by chaining the method calls of
+		 * {@link org.apache.flink.api.java.operators.JoinOperator.JoinProjection#projectFirst(int...)} and
 		 * {@link org.apache.flink.api.java.operators.JoinOperator.JoinProjection#projectSecond(int...)}.
-		 * 
+		 *
 		 * @param firstFieldIndexes If the first input is a Tuple DataSet, the indexes of the selected fields.
 		 * 					   For a non-Tuple DataSet, do not provide parameters.
 		 * 					   The order of fields in the output tuple is defined by to the order of field indexes.
 		 * @return An extended JoinProjection.
-		 * 
+		 *
 		 * @see Tuple
 		 * @see DataSet
 		 */
 		protected JoinProjection<I1, I2> projectFirst(int... firstFieldIndexes) {
-			
+
 			boolean isFirstTuple;
 
 			isFirstTuple = ds1.getType() instanceof TupleTypeInfo && firstFieldIndexes.length > 0;
-			
-			if(!isFirstTuple && firstFieldIndexes.length != 0) {
+
+			if (!isFirstTuple && firstFieldIndexes.length != 0) {
 				// field index provided for non-Tuple input
 				throw new IllegalArgumentException("Input is not a Tuple. Call projectFirst() without arguments to include it.");
-			} else if(firstFieldIndexes.length > (22 - this.fieldIndexes.length)) {
+			} else if (firstFieldIndexes.length > (22 - this.fieldIndexes.length)) {
 				// to many field indexes provided
 				throw new IllegalArgumentException("You may select only up to twenty-two (22) fields in total.");
 			}
-			
+
 			int offset = this.fieldIndexes.length;
-			
-			if(isFirstTuple) {
+
+			if (isFirstTuple) {
 				// extend index and flag arrays
 				this.fieldIndexes = Arrays.copyOf(this.fieldIndexes, this.fieldIndexes.length + firstFieldIndexes.length);
 				this.isFieldInFirst = Arrays.copyOf(this.isFieldInFirst, this.isFieldInFirst.length + firstFieldIndexes.length);
-				
+
 				// copy field indexes
 				int maxFieldIndex = numFieldsDs1;
-				for(int i = 0; i < firstFieldIndexes.length; i++) {
+				for (int i = 0; i < firstFieldIndexes.length; i++) {
 					// check if indexes in range
 					Preconditions.checkElementIndex(firstFieldIndexes[i], maxFieldIndex);
 
@@ -1187,59 +1223,60 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 				// extend index and flag arrays
 				this.fieldIndexes = Arrays.copyOf(this.fieldIndexes, this.fieldIndexes.length + 1);
 				this.isFieldInFirst = Arrays.copyOf(this.isFieldInFirst, this.isFieldInFirst.length + 1);
-				
+
 				// add input object to output tuple
 				this.isFieldInFirst[offset] = true;
 				this.fieldIndexes[offset] = -1;
 			}
-			
+
 			return this;
 		}
-		
+
 		/**
-		 * Continues a ProjectJoin transformation and adds fields of the second join input.<br>
-		 * If the second join input is a {@link Tuple} {@link DataSet}, fields can be selected by their index.
-		 * If the second join input is not a Tuple DataSet, no parameters should be passed.<br>
-		 * 
-		 * Fields of the first and second input can be added by chaining the method calls of
+		 * Continues a ProjectJoin transformation and adds fields of the second join input.
+		 *
+		 * <p>If the second join input is a {@link Tuple} {@link DataSet}, fields can be selected by their index.
+		 * If the second join input is not a Tuple DataSet, no parameters should be passed.
+		 *
+		 * <p>Fields of the first and second input can be added by chaining the method calls of
 		 * {@link org.apache.flink.api.java.operators.JoinOperator.JoinProjection#projectFirst(int...)} and
 		 * {@link org.apache.flink.api.java.operators.JoinOperator.JoinProjection#projectSecond(int...)}.
-		 * 
-		 * @param secondFieldIndexes If the second input is a Tuple DataSet, the indexes of the selected fields. 
+		 *
+		 * @param secondFieldIndexes If the second input is a Tuple DataSet, the indexes of the selected fields.
 		 * 					   For a non-Tuple DataSet, do not provide parameters.
 		 * 					   The order of fields in the output tuple is defined by to the order of field indexes.
 		 * @return An extended JoinProjection.
-		 * 
+		 *
 		 * @see Tuple
 		 * @see DataSet
 		 */
 		protected JoinProjection<I1, I2> projectSecond(int... secondFieldIndexes) {
-			
+
 			boolean isSecondTuple;
 
 			isSecondTuple = ds2.getType() instanceof TupleTypeInfo && secondFieldIndexes.length > 0;
-			
-			if(!isSecondTuple && secondFieldIndexes.length != 0) {
+
+			if (!isSecondTuple && secondFieldIndexes.length != 0) {
 				// field index provided for non-Tuple input
 				throw new IllegalArgumentException("Input is not a Tuple. Call projectSecond() without arguments to include it.");
-			} else if(secondFieldIndexes.length > (22 - this.fieldIndexes.length)) {
+			} else if (secondFieldIndexes.length > (22 - this.fieldIndexes.length)) {
 				// to many field indexes provided
 				throw new IllegalArgumentException("You may select only up to twenty-two (22) fields in total.");
 			}
-			
+
 			int offset = this.fieldIndexes.length;
-			
-			if(isSecondTuple) {
+
+			if (isSecondTuple) {
 				// extend index and flag arrays
 				this.fieldIndexes = Arrays.copyOf(this.fieldIndexes, this.fieldIndexes.length + secondFieldIndexes.length);
 				this.isFieldInFirst = Arrays.copyOf(this.isFieldInFirst, this.isFieldInFirst.length + secondFieldIndexes.length);
-				
+
 				// copy field indexes
 				int maxFieldIndex = numFieldsDs2;
-				for(int i = 0; i < secondFieldIndexes.length; i++) {
+				for (int i = 0; i < secondFieldIndexes.length; i++) {
 					// check if indexes in range
 					Preconditions.checkElementIndex(secondFieldIndexes[i], maxFieldIndex);
-					
+
 					this.isFieldInFirst[offset + i] = false;
 					this.fieldIndexes[offset + i] = secondFieldIndexes[i];
 				}
@@ -1247,27 +1284,27 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 				// extend index and flag arrays
 				this.fieldIndexes = Arrays.copyOf(this.fieldIndexes, this.fieldIndexes.length + 1);
 				this.isFieldInFirst = Arrays.copyOf(this.isFieldInFirst, this.isFieldInFirst.length + 1);
-				
+
 				// add input object to output tuple
 				this.isFieldInFirst[offset] = false;
 				this.fieldIndexes[offset] = -1;
 			}
-			
+
 			return this;
 		}
-		
-		// --------------------------------------------------------------------------------------------	
+
+		// --------------------------------------------------------------------------------------------
 		// The following lines are generated.
-		// --------------------------------------------------------------------------------------------	
-		// BEGIN_OF_TUPLE_DEPENDENT_CODE	
+		// --------------------------------------------------------------------------------------------
+		// BEGIN_OF_TUPLE_DEPENDENT_CODE
 	// GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator.
 
 		/**
 		 * Chooses a projectTupleX according to the length of
-		 * {@link org.apache.flink.api.java.operators.JoinOperator.JoinProjection#fieldIndexes}
-		 * 
+		 * {@link org.apache.flink.api.java.operators.JoinOperator.JoinProjection#fieldIndexes}.
+		 *
 		 * @return The projected DataSet.
-		 * 
+		 *
 		 * @see org.apache.flink.api.java.operators.JoinOperator.ProjectJoin
 		 */
 		@SuppressWarnings("unchecked")
@@ -1307,11 +1344,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 
 		/**
-		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields. 
-		 * Requires the classes of the fields of the resulting tuples. 
-		 * 
+		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields.
+		 * Requires the classes of the fields of the resulting tuples.
+		 *
 		 * @return The projected data set.
-		 * 
+		 *
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -1323,11 +1360,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 
 		/**
-		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields. 
-		 * Requires the classes of the fields of the resulting tuples. 
-		 * 
+		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields.
+		 * Requires the classes of the fields of the resulting tuples.
+		 *
 		 * @return The projected data set.
-		 * 
+		 *
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -1339,11 +1376,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 
 		/**
-		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields. 
-		 * Requires the classes of the fields of the resulting tuples. 
-		 * 
+		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields.
+		 * Requires the classes of the fields of the resulting tuples.
+		 *
 		 * @return The projected data set.
-		 * 
+		 *
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -1355,11 +1392,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 
 		/**
-		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields. 
-		 * Requires the classes of the fields of the resulting tuples. 
-		 * 
+		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields.
+		 * Requires the classes of the fields of the resulting tuples.
+		 *
 		 * @return The projected data set.
-		 * 
+		 *
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -1371,11 +1408,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 
 		/**
-		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields. 
-		 * Requires the classes of the fields of the resulting tuples. 
-		 * 
+		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields.
+		 * Requires the classes of the fields of the resulting tuples.
+		 *
 		 * @return The projected data set.
-		 * 
+		 *
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -1387,11 +1424,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 
 		/**
-		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields. 
-		 * Requires the classes of the fields of the resulting tuples. 
-		 * 
+		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields.
+		 * Requires the classes of the fields of the resulting tuples.
+		 *
 		 * @return The projected data set.
-		 * 
+		 *
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -1403,11 +1440,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 
 		/**
-		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields. 
-		 * Requires the classes of the fields of the resulting tuples. 
-		 * 
+		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields.
+		 * Requires the classes of the fields of the resulting tuples.
+		 *
 		 * @return The projected data set.
-		 * 
+		 *
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -1419,11 +1456,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 
 		/**
-		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields. 
-		 * Requires the classes of the fields of the resulting tuples. 
-		 * 
+		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields.
+		 * Requires the classes of the fields of the resulting tuples.
+		 *
 		 * @return The projected data set.
-		 * 
+		 *
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -1435,11 +1472,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 
 		/**
-		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields. 
-		 * Requires the classes of the fields of the resulting tuples. 
-		 * 
+		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields.
+		 * Requires the classes of the fields of the resulting tuples.
+		 *
 		 * @return The projected data set.
-		 * 
+		 *
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -1451,11 +1488,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 
 		/**
-		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields. 
-		 * Requires the classes of the fields of the resulting tuples. 
-		 * 
+		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields.
+		 * Requires the classes of the fields of the resulting tuples.
+		 *
 		 * @return The projected data set.
-		 * 
+		 *
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -1467,11 +1504,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 
 		/**
-		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields. 
-		 * Requires the classes of the fields of the resulting tuples. 
-		 * 
+		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields.
+		 * Requires the classes of the fields of the resulting tuples.
+		 *
 		 * @return The projected data set.
-		 * 
+		 *
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -1483,11 +1520,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 
 		/**
-		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields. 
-		 * Requires the classes of the fields of the resulting tuples. 
-		 * 
+		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields.
+		 * Requires the classes of the fields of the resulting tuples.
+		 *
 		 * @return The projected data set.
-		 * 
+		 *
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -1499,11 +1536,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 
 		/**
-		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields. 
-		 * Requires the classes of the fields of the resulting tuples. 
-		 * 
+		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields.
+		 * Requires the classes of the fields of the resulting tuples.
+		 *
 		 * @return The projected data set.
-		 * 
+		 *
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -1515,11 +1552,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 
 		/**
-		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields. 
-		 * Requires the classes of the fields of the resulting tuples. 
-		 * 
+		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields.
+		 * Requires the classes of the fields of the resulting tuples.
+		 *
 		 * @return The projected data set.
-		 * 
+		 *
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -1531,11 +1568,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 
 		/**
-		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields. 
-		 * Requires the classes of the fields of the resulting tuples. 
-		 * 
+		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields.
+		 * Requires the classes of the fields of the resulting tuples.
+		 *
 		 * @return The projected data set.
-		 * 
+		 *
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -1547,11 +1584,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 
 		/**
-		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields. 
-		 * Requires the classes of the fields of the resulting tuples. 
-		 * 
+		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields.
+		 * Requires the classes of the fields of the resulting tuples.
+		 *
 		 * @return The projected data set.
-		 * 
+		 *
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -1563,11 +1600,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 
 		/**
-		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields. 
-		 * Requires the classes of the fields of the resulting tuples. 
-		 * 
+		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields.
+		 * Requires the classes of the fields of the resulting tuples.
+		 *
 		 * @return The projected data set.
-		 * 
+		 *
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -1579,11 +1616,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 
 		/**
-		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields. 
-		 * Requires the classes of the fields of the resulting tuples. 
-		 * 
+		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields.
+		 * Requires the classes of the fields of the resulting tuples.
+		 *
 		 * @return The projected data set.
-		 * 
+		 *
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -1595,11 +1632,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 
 		/**
-		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields. 
-		 * Requires the classes of the fields of the resulting tuples. 
-		 * 
+		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields.
+		 * Requires the classes of the fields of the resulting tuples.
+		 *
 		 * @return The projected data set.
-		 * 
+		 *
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -1611,11 +1648,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 
 		/**
-		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields. 
-		 * Requires the classes of the fields of the resulting tuples. 
-		 * 
+		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields.
+		 * Requires the classes of the fields of the resulting tuples.
+		 *
 		 * @return The projected data set.
-		 * 
+		 *
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -1627,11 +1664,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 
 		/**
-		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields. 
-		 * Requires the classes of the fields of the resulting tuples. 
-		 * 
+		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields.
+		 * Requires the classes of the fields of the resulting tuples.
+		 *
 		 * @return The projected data set.
-		 * 
+		 *
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -1643,11 +1680,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 
 		/**
-		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields. 
-		 * Requires the classes of the fields of the resulting tuples. 
-		 * 
+		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields.
+		 * Requires the classes of the fields of the resulting tuples.
+		 *
 		 * @return The projected data set.
-		 * 
+		 *
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -1659,11 +1696,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 
 		/**
-		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields. 
-		 * Requires the classes of the fields of the resulting tuples. 
-		 * 
+		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields.
+		 * Requires the classes of the fields of the resulting tuples.
+		 *
 		 * @return The projected data set.
-		 * 
+		 *
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -1675,11 +1712,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 
 		/**
-		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields. 
-		 * Requires the classes of the fields of the resulting tuples. 
-		 * 
+		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields.
+		 * Requires the classes of the fields of the resulting tuples.
+		 *
 		 * @return The projected data set.
-		 * 
+		 *
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -1691,11 +1728,11 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 
 		/**
-		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields. 
-		 * Requires the classes of the fields of the resulting tuples. 
-		 * 
+		 * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields.
+		 * Requires the classes of the fields of the resulting tuples.
+		 *
 		 * @return The projected data set.
-		 * 
+		 *
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -1708,23 +1745,23 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 
 		// END_OF_TUPLE_DEPENDENT_CODE
 		// -----------------------------------------------------------------------------------------
-		
+
 		private TypeInformation<?>[] extractFieldTypes(int[] fields) {
-			
+
 			TypeInformation<?>[] fieldTypes = new TypeInformation[fields.length];
 
-			for(int i=0; i<fields.length; i++) {
-				
+			for (int i = 0; i < fields.length; i++) {
+
 				TypeInformation<?> typeInfo;
-				if(isFieldInFirst[i]) {
-					if(fields[i] >= 0) {
-						typeInfo = ((TupleTypeInfo<?>)ds1.getType()).getTypeAt(fields[i]);
+				if (isFieldInFirst[i]) {
+					if (fields[i] >= 0) {
+						typeInfo = ((TupleTypeInfo<?>) ds1.getType()).getTypeAt(fields[i]);
 					} else {
 						typeInfo = ds1.getType();
 					}
 				} else {
-					if(fields[i] >= 0) {
-						typeInfo = ((TupleTypeInfo<?>)ds2.getType()).getTypeAt(fields[i]);
+					if (fields[i] >= 0) {
+						typeInfo = ((TupleTypeInfo<?>) ds2.getType()).getTypeAt(fields[i]);
 					} else {
 						typeInfo = ds2.getType();
 					}
@@ -1732,9 +1769,9 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 
 				fieldTypes[i] = typeInfo;
 			}
-			
+
 			return fieldTypes;
 		}
-				
+
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
index 057048c..f6336cd 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
@@ -20,10 +20,10 @@ package org.apache.flink.api.java.operators;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
 import org.apache.flink.api.java.operators.translation.KeyRemovingMapper;
 import org.apache.flink.api.java.operators.translation.TwoKeyExtractingMapper;
@@ -41,8 +41,7 @@ public class KeyFunctions {
 	@SuppressWarnings("unchecked")
 	public static <T, K> org.apache.flink.api.common.operators.Operator<Tuple2<K, T>> appendKeyExtractor(
 			org.apache.flink.api.common.operators.Operator<T> input,
-			SelectorFunctionKeys<T, K> key)
-	{
+			SelectorFunctionKeys<T, K> key) {
 
 		TypeInformation<T> inputType = key.getInputType();
 		TypeInformation<Tuple2<K, T>> typeInfoWithKey = createTypeWithKey(key);
@@ -65,8 +64,7 @@ public class KeyFunctions {
 	public static <T, K1, K2> org.apache.flink.api.common.operators.Operator<Tuple3<K1, K2, T>> appendKeyExtractor(
 			org.apache.flink.api.common.operators.Operator<T> input,
 			SelectorFunctionKeys<T, K1> key1,
-			SelectorFunctionKeys<T, K2> key2)
-	{
+			SelectorFunctionKeys<T, K2> key2) {
 
 		TypeInformation<T> inputType = key1.getInputType();
 		TypeInformation<Tuple3<K1, K2, T>> typeInfoWithKey = createTypeWithKey(key1, key2);
@@ -88,8 +86,7 @@ public class KeyFunctions {
 
 	public static <T, K> org.apache.flink.api.common.operators.SingleInputOperator<?, T, ?> appendKeyRemover(
 			org.apache.flink.api.common.operators.Operator<Tuple2<K, T>> inputWithKey,
-			SelectorFunctionKeys<T, K> key)
-	{
+			SelectorFunctionKeys<T, K> key) {
 
 		TypeInformation<T> inputType = key.getInputType();
 		TypeInformation<Tuple2<K, T>> typeInfoWithKey = createTypeWithKey(key);
@@ -107,15 +104,13 @@ public class KeyFunctions {
 	}
 
 	public static <T, K> TypeInformation<Tuple2<K, T>> createTypeWithKey(
-			SelectorFunctionKeys<T, K> key)
-	{
+			SelectorFunctionKeys<T, K> key) {
 		return new TupleTypeInfo<>(key.getKeyType(), key.getInputType());
 	}
 
 	public static <T, K1, K2> TypeInformation<Tuple3<K1, K2, T>> createTypeWithKey(
 			SelectorFunctionKeys<T, K1> key1,
-			SelectorFunctionKeys<T, K2> key2)
-	{
+			SelectorFunctionKeys<T, K2> key2) {
 		return new TupleTypeInfo<>(key1.getKeyType(), key2.getKeyType(), key1.getInputType());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
index 2f398fb..a9d5672 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapOperator.java
@@ -29,52 +29,52 @@ import org.apache.flink.api.java.DataSet;
 /**
  * This operator represents the application of a "map" function on a data set, and the
  * result data set produced by the function.
- * 
+ *
  * @param <IN> The type of the data set consumed by the operator.
  * @param <OUT> The type of the data set created by the operator.
- * 
+ *
  * @see org.apache.flink.api.common.functions.MapFunction
  */
 @Public
 public class MapOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapOperator<IN, OUT>> {
-	
+
 	protected final MapFunction<IN, OUT> function;
-	
+
 	protected final String defaultName;
 
 	public MapOperator(DataSet<IN> input, TypeInformation<OUT> resultType, MapFunction<IN, OUT> function, String defaultName) {
 		super(input, resultType);
-		
+
 		this.defaultName = defaultName;
 		this.function = function;
 
 		UdfOperatorUtils.analyzeSingleInputUdf(this, MapFunction.class, defaultName, function, null);
 	}
-	
+
 	@Override
 	protected MapFunction<IN, OUT> getFunction() {
 		return function;
 	}
-	
+
 	@Override
 	protected MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) {
-		
-		String name = getName() != null ? getName() : "Map at "+defaultName;
+
+		String name = getName() != null ? getName() : "Map at " + defaultName;
 		// create operator
 		MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> po = new MapOperatorBase<IN, OUT, MapFunction<IN, OUT>>(function,
 				new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
 		// set input
 		po.setInput(input);
 		// set parallelism
-		if(this.getParallelism() > 0) {
+		if (this.getParallelism() > 0) {
 			// use specified parallelism
 			po.setParallelism(this.getParallelism());
 		} else {
 			// if no parallelism has been specified, use parallelism of input operator to enable chaining
 			po.setParallelism(input.getParallelism());
 		}
-		
+
 		return po;
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
index 6bc48b8..e03f39d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/MapPartitionOperator.java
@@ -29,48 +29,48 @@ import org.apache.flink.api.java.DataSet;
 /**
  * This operator represents the application of a "mapPartition" function on a data set, and the
  * result data set produced by the function.
- * 
+ *
  * @param <IN> The type of the data set consumed by the operator.
  * @param <OUT> The type of the data set created by the operator.
- * 
+ *
  * @see MapPartitionFunction
  */
 @Public
 public class MapPartitionOperator<IN, OUT> extends SingleInputUdfOperator<IN, OUT, MapPartitionOperator<IN, OUT>> {
-	
+
 	protected final MapPartitionFunction<IN, OUT> function;
-	
+
 	protected final String defaultName;
-	
+
 	public MapPartitionOperator(DataSet<IN> input, TypeInformation<OUT> resultType, MapPartitionFunction<IN, OUT> function, String defaultName) {
 		super(input, resultType);
-		
+
 		this.function = function;
 		this.defaultName = defaultName;
 	}
-	
+
 	@Override
 	protected MapPartitionFunction<IN, OUT> getFunction() {
 		return function;
 	}
-	
+
 	@Override
 	protected MapPartitionOperatorBase<IN, OUT, MapPartitionFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) {
-		
-		String name = getName() != null ? getName() : "MapPartition at "+defaultName;
+
+		String name = getName() != null ? getName() : "MapPartition at " + defaultName;
 		// create operator
 		MapPartitionOperatorBase<IN, OUT, MapPartitionFunction<IN, OUT>> po = new MapPartitionOperatorBase<IN, OUT, MapPartitionFunction<IN, OUT>>(function, new UnaryOperatorInformation<IN, OUT>(getInputType(), getResultType()), name);
 		// set input
 		po.setInput(input);
 		// set parallelism
-		if(this.getParallelism() > 0) {
+		if (this.getParallelism() > 0) {
 			// use specified parallelism
 			po.setParallelism(this.getParallelism());
 		} else {
 			// if no parallelism has been specified, use parallelism of input operator to enable chaining
 			po.setParallelism(input.getParallelism());
 		}
-		
+
 		return po;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
index e496c62..463dc35 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
@@ -28,7 +28,7 @@ import org.apache.flink.util.Preconditions;
 
 /**
  * Base class of all operators in the Java API.
- * 
+ *
  * @param <OUT> The type of the data set produced by this operator.
  * @param <O> The type of the operator, so that we can return it.
  */
@@ -36,21 +36,20 @@ import org.apache.flink.util.Preconditions;
 public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<OUT> {
 
 	protected String name;
-	
+
 	protected int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
 
 	protected ResourceSpec minResources = ResourceSpec.DEFAULT;
 
 	protected ResourceSpec preferredResources = ResourceSpec.DEFAULT;
 
-
 	protected Operator(ExecutionEnvironment context, TypeInformation<OUT> resultType) {
 		super(context, resultType);
 	}
-	
+
 	/**
 	 * Returns the type of the result of this operator.
-	 * 
+	 *
 	 * @return The result type of the operator.
 	 */
 	public TypeInformation<OUT> getResultType() {
@@ -60,16 +59,16 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<
 	/**
 	 * Returns the name of the operator. If no name has been set, it returns the name of the
 	 * operation, or the name of the class implementing the function of this operator.
-	 * 
+	 *
 	 * @return The name of the operator.
 	 */
 	public String getName() {
 		return name;
 	}
-	
+
 	/**
 	 * Returns the parallelism of this operator.
-	 * 
+	 *
 	 * @return The parallelism of this operator.
 	 */
 	public int getParallelism() {
@@ -100,7 +99,7 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<
 	 * Sets the name of this operator. This overrides the default name, which is either
 	 * a generated description of the operation (such as for example "Aggregate(1:SUM, 2:MIN)")
 	 * or the name the user-defined function or input/output format executed by the operator.
-	 * 
+	 *
 	 * @param newName The name for this operator.
 	 * @return The operator with a new name.
 	 */
@@ -110,11 +109,11 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<
 		O returnType = (O) this;
 		return returnType;
 	}
-	
+
 	/**
 	 * Sets the parallelism for this operator.
 	 * The parallelism must be 1 or more.
-	 * 
+	 *
 	 * @param parallelism The parallelism for this operator. A value equal to {@link ExecutionConfig#PARALLELISM_DEFAULT}
 	 *        will use the system default.
 	 * @return The operator with set parallelism.

http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
index 22b9186..facadc0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
@@ -36,31 +36,32 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * Used for translating data sets into corresponding operators.
+ */
 @Internal
 public class OperatorTranslation {
-	
-	/** The already translated operations */
+
+	/** The already translated operations. */
 	private Map<DataSet<?>, Operator<?>> translated = new HashMap<>();
-	
-	
+
 	public Plan translateToPlan(List<DataSink<?>> sinks, String jobName) {
 		List<GenericDataSinkBase<?>> planSinks = new ArrayList<>();
-		
+
 		for (DataSink<?> sink : sinks) {
 			planSinks.add(translate(sink));
 		}
-		
+
 		Plan p = new Plan(planSinks);
 		p.setJobName(jobName);
 		return p;
 	}
-	
-	
+
 	private <T> GenericDataSinkBase<T> translate(DataSink<T> sink) {
-		
+
 		// translate the input recursively
 		Operator<T> input = translate(sink.getDataSet());
-		
+
 		// translate the sink itself and connect it to the input
 		GenericDataSinkBase<T> translatedSink = sink.translateToDataFlow(input);
 
@@ -68,8 +69,7 @@ public class OperatorTranslation {
 
 		return translatedSink;
 	}
-	
-	
+
 	private <T> Operator<T> translate(DataSet<T> dataSet) {
 		while (dataSet instanceof NoOpOperator) {
 			dataSet = ((NoOpOperator<T>) dataSet).getInput();
@@ -89,9 +89,9 @@ public class OperatorTranslation {
 				return typedPrevious;
 			}
 		}
-		
+
 		Operator<T> dataFlowOp;
-		
+
 		if (dataSet instanceof DataSource) {
 			DataSource<T> dataSource = (DataSource<T>) dataSet;
 			dataFlowOp = dataSource.translateToDataFlow();
@@ -126,28 +126,27 @@ public class OperatorTranslation {
 		else {
 			throw new RuntimeException("Error while creating the data flow plan for the program: Unknown operator or data set type: " + dataSet);
 		}
-		
+
 		this.translated.put(dataSet, dataFlowOp);
-		
+
 		// take care of broadcast variables
 		translateBcVariables(dataSet, dataFlowOp);
-		
+
 		return dataFlowOp;
 	}
-	
-	
+
 	private <I, O> org.apache.flink.api.common.operators.Operator<O> translateSingleInputOperator(SingleInputOperator<?, ?, ?> op) {
-		
+
 		@SuppressWarnings("unchecked")
 		SingleInputOperator<I, O, ?> typedOp = (SingleInputOperator<I, O, ?>) op;
-		
+
 		@SuppressWarnings("unchecked")
 		DataSet<I> typedInput = (DataSet<I>) op.getInput();
-		
+
 		Operator<I> input = translate(typedInput);
-		
+
 		org.apache.flink.api.common.operators.Operator<O> dataFlowOp = typedOp.translateToDataFlow(input);
-		
+
 		if (op instanceof UdfOperator<?>) {
 			@SuppressWarnings("unchecked")
 			SingleInputUdfOperator<I, O, ?> udfOp = (SingleInputUdfOperator<I, O, ?>) op;
@@ -165,29 +164,29 @@ public class OperatorTranslation {
 				unaryOp.setSemanticProperties(udfOp.getSemanticProperties());
 			}
 		}
-		
+
 		return dataFlowOp;
 	}
-	
+
 	private <I1, I2, O> org.apache.flink.api.common.operators.Operator<O> translateTwoInputOperator(TwoInputOperator<?, ?, ?, ?> op) {
-		
+
 		@SuppressWarnings("unchecked")
 		TwoInputOperator<I1, I2, O, ?> typedOp = (TwoInputOperator<I1, I2, O, ?>) op;
-		
+
 		@SuppressWarnings("unchecked")
 		DataSet<I1> typedInput1 = (DataSet<I1>) op.getInput1();
 		@SuppressWarnings("unchecked")
 		DataSet<I2> typedInput2 = (DataSet<I2>) op.getInput2();
-		
+
 		Operator<I1> input1 = translate(typedInput1);
 		Operator<I2> input2 = translate(typedInput2);
-		
+
 		org.apache.flink.api.common.operators.Operator<O> dataFlowOp = typedOp.translateToDataFlow(input1, input2);
-		
-		if (op instanceof UdfOperator<?> ) {
+
+		if (op instanceof UdfOperator<?>) {
 			@SuppressWarnings("unchecked")
 			TwoInputUdfOperator<I1, I2, O, ?> udfOp = (TwoInputUdfOperator<I1, I2, O, ?>) op;
-			
+
 			// set configuration parameters
 			Configuration opParams = udfOp.getParameters();
 			if (opParams != null) {
@@ -201,16 +200,14 @@ public class OperatorTranslation {
 				binaryOp.setSemanticProperties(udfOp.getSemanticProperties());
 			}
 		}
-		
+
 		return dataFlowOp;
 	}
-	
-	
+
 	private <T> BulkIterationBase<T> translateBulkIteration(BulkIterationResultSet<?> untypedIterationEnd) {
 		@SuppressWarnings("unchecked")
 		BulkIterationResultSet<T> iterationEnd = (BulkIterationResultSet<T>) untypedIterationEnd;
 		IterativeDataSet<T> iterationHead = iterationEnd.getIterationHead();
-
 		BulkIterationBase<T> iterationOperator =
 				new BulkIterationBase<>(new UnaryOperatorInformation<>(iterationEnd.getType(), iterationEnd.getType()), "Bulk Iteration");
 
@@ -224,28 +221,28 @@ public class OperatorTranslation {
 		iterationOperator.setNextPartialSolution(translatedBody);
 		iterationOperator.setMaximumNumberOfIterations(iterationHead.getMaxIterations());
 		iterationOperator.setInput(translate(iterationHead.getInput()));
-		
+
 		iterationOperator.getAggregators().addAll(iterationHead.getAggregators());
-		
-		if(iterationEnd.getTerminationCriterion() != null) {
+
+		if (iterationEnd.getTerminationCriterion() != null) {
 			iterationOperator.setTerminationCriterion(translate(iterationEnd.getTerminationCriterion()));
 		}
 
 		return iterationOperator;
 	}
-	
+
 	private <D, W> DeltaIterationBase<D, W> translateDeltaIteration(DeltaIterationResultSet<?, ?> untypedIterationEnd) {
 		@SuppressWarnings("unchecked")
 		DeltaIterationResultSet<D, W> iterationEnd = (DeltaIterationResultSet<D, W>) untypedIterationEnd;
 		DeltaIteration<D, W> iterationHead = iterationEnd.getIterationHead();
-		
+
 		String name = iterationHead.getName() == null ? "Unnamed Delta Iteration" : iterationHead.getName();
-		
+
 		DeltaIterationBase<D, W> iterationOperator = new DeltaIterationBase<>(new BinaryOperatorInformation<>(iterationEnd.getType(), iterationEnd.getWorksetType(), iterationEnd.getType()),
 				iterationEnd.getKeyPositions(), name);
-		
+
 		iterationOperator.setMaximumNumberOfIterations(iterationEnd.getMaxIterations());
-		
+
 		if (iterationHead.getParallelism() > 0) {
 			iterationOperator.setParallelism(iterationHead.getParallelism());
 		}
@@ -258,31 +255,31 @@ public class OperatorTranslation {
 
 		Operator<D> translatedSolutionSet = translate(iterationEnd.getNextSolutionSet());
 		Operator<W> translatedWorkset = translate(iterationEnd.getNextWorkset());
-		
+
 		iterationOperator.setNextWorkset(translatedWorkset);
 		iterationOperator.setSolutionSetDelta(translatedSolutionSet);
 
 		iterationOperator.setInitialSolutionSet(translate(iterationHead.getInitialSolutionSet()));
 		iterationOperator.setInitialWorkset(translate(iterationHead.getInitialWorkset()));
-		
+
 		// register all aggregators
 		iterationOperator.getAggregators().addAll(iterationHead.getAggregators());
-		
+
 		iterationOperator.setSolutionSetUnManaged(iterationHead.isSolutionSetUnManaged());
-		
+
 		return iterationOperator;
 	}
-	
+
 	private void translateBcVariables(DataSet<?> setOrOp, Operator<?> dataFlowOp) {
 		// check if this is actually an operator that could have broadcast variables
 		if (setOrOp instanceof UdfOperator) {
 			if (!(dataFlowOp instanceof AbstractUdfOperator<?, ?>)) {
 				throw new RuntimeException("Error while creating the data flow plan for the program: A UDF operation was not translated to a UDF operator.");
 			}
-			
+
 			UdfOperator<?> udfOp = (UdfOperator<?>) setOrOp;
 			AbstractUdfOperator<?, ?> udfDataFlowOp = (AbstractUdfOperator<?, ?>) dataFlowOp;
-		
+
 			for (Map.Entry<String, DataSet<?>> bcVariable : udfOp.getBroadcastSets().entrySet()) {
 				Operator<?> bcInput = translate(bcVariable.getValue());
 				udfDataFlowOp.setBroadcastVariable(bcVariable.getKey(), bcInput);


Mime
View raw message