flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dwysakow...@apache.org
Subject [2/6] flink git commit: [FLINK-7181] Activate checkstyle flink-java/operators/*
Date Tue, 25 Jul 2017 08:19:04 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
index b3234b8..8a9a72d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.api.java.operators;
 
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.distributions.DataDistribution;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
@@ -33,7 +33,6 @@ import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
 import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Preconditions;
 
@@ -46,7 +45,7 @@ import java.util.Arrays;
  */
 @Public
 public class PartitionOperator<T> extends SingleInputOperator<T, T, PartitionOperator<T>> {
-	
+
 	private final Keys<T> pKeys;
 	private final PartitionMethod pMethod;
 	private final String partitionLocationName;
@@ -54,7 +53,6 @@ public class PartitionOperator<T> extends SingleInputOperator<T, T, PartitionOpe
 	private final DataDistribution distribution;
 	private Order[] orders;
 
-
 	public PartitionOperator(DataSet<T> input, PartitionMethod pMethod, Keys<T> pKeys, String partitionLocationName) {
 		this(input, pMethod, pKeys, null, null, null, partitionLocationName);
 	}
@@ -66,37 +64,35 @@ public class PartitionOperator<T> extends SingleInputOperator<T, T, PartitionOpe
 	public PartitionOperator(DataSet<T> input, PartitionMethod pMethod, String partitionLocationName) {
 		this(input, pMethod, null, null, null, null, partitionLocationName);
 	}
-	
+
 	public PartitionOperator(DataSet<T> input, Keys<T> pKeys, Partitioner<?> customPartitioner, String partitionLocationName) {
 		this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, null, null, partitionLocationName);
 	}
-	
+
 	public <P> PartitionOperator(DataSet<T> input, Keys<T> pKeys, Partitioner<P> customPartitioner,
-			TypeInformation<P> partitionerTypeInfo, String partitionLocationName)
-	{
+			TypeInformation<P> partitionerTypeInfo, String partitionLocationName) {
 		this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, partitionerTypeInfo, null, partitionLocationName);
 	}
-	
+
 	private <P> PartitionOperator(DataSet<T> input, PartitionMethod pMethod, Keys<T> pKeys, Partitioner<P> customPartitioner,
-			TypeInformation<P> partitionerTypeInfo, DataDistribution distribution, String partitionLocationName)
-	{
+			TypeInformation<P> partitionerTypeInfo, DataDistribution distribution, String partitionLocationName) {
 		super(input, input.getType());
-		
+
 		Preconditions.checkNotNull(pMethod);
 		Preconditions.checkArgument(pKeys != null || pMethod == PartitionMethod.REBALANCE, "Partitioning requires keys");
 		Preconditions.checkArgument(pMethod != PartitionMethod.CUSTOM || customPartitioner != null, "Custom partioning requires a partitioner.");
 		Preconditions.checkArgument(distribution == null || pMethod == PartitionMethod.RANGE, "Customized data distribution is only neccessary for range partition.");
-		
+
 		if (distribution != null) {
 			Preconditions.checkArgument(pKeys.getNumberOfKeyFields() <= distribution.getNumberOfFields(), "The distribution must provide at least as many fields as flat key fields are specified.");
 			Preconditions.checkArgument(Arrays.equals(pKeys.getKeyFieldTypes(), Arrays.copyOfRange(distribution.getKeyTypes(), 0, pKeys.getNumberOfKeyFields())),
 					"The types of the flat key fields must be equal to the types of the fields of the distribution.");
 		}
-		
+
 		if (customPartitioner != null) {
 			pKeys.validateCustomPartitioner(customPartitioner, partitionerTypeInfo);
 		}
-		
+
 		this.pMethod = pMethod;
 		this.pKeys = pKeys;
 		this.partitionLocationName = partitionLocationName;
@@ -121,43 +117,43 @@ public class PartitionOperator<T> extends SingleInputOperator<T, T, PartitionOpe
 
 		return this;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Properties
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Gets the custom partitioner from this partitioning.
-	 * 
+	 *
 	 * @return The custom partitioner.
 	 */
 	@Internal
 	public Partitioner<?> getCustomPartitioner() {
 		return customPartitioner;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Translation
 	// --------------------------------------------------------------------------------------------
-	
+
 	protected org.apache.flink.api.common.operators.SingleInputOperator<?, T, ?> translateToDataFlow(Operator<T> input) {
-	
+
 		String name = "Partition at " + partitionLocationName;
-		
+
 		// distinguish between partition types
 		if (pMethod == PartitionMethod.REBALANCE) {
-			
+
 			UnaryOperatorInformation<T, T> operatorInfo = new UnaryOperatorInformation<>(getType(), getType());
 			PartitionOperatorBase<T> rebalancedInput = new PartitionOperatorBase<>(operatorInfo, pMethod, name);
 			rebalancedInput.setInput(input);
 			rebalancedInput.setParallelism(getParallelism());
-			
+
 			return rebalancedInput;
-		} 
+		}
 		else if (pMethod == PartitionMethod.HASH || pMethod == PartitionMethod.CUSTOM || pMethod == PartitionMethod.RANGE) {
-			
+
 			if (pKeys instanceof Keys.ExpressionKeys) {
-				
+
 				int[] logicalKeyPositions = pKeys.computeLogicalKeyPositions();
 				UnaryOperatorInformation<T, T> operatorInfo = new UnaryOperatorInformation<>(getType(), getType());
 				PartitionOperatorBase<T> partitionedInput = new PartitionOperatorBase<>(operatorInfo, pMethod, logicalKeyPositions, name);
@@ -166,11 +162,11 @@ public class PartitionOperator<T> extends SingleInputOperator<T, T, PartitionOpe
 				partitionedInput.setDistribution(distribution);
 				partitionedInput.setCustomPartitioner(customPartitioner);
 				partitionedInput.setOrdering(computeOrdering(pKeys, orders));
-				
+
 				return partitionedInput;
 			}
 			else if (pKeys instanceof Keys.SelectorFunctionKeys) {
-				
+
 				@SuppressWarnings("unchecked")
 				Keys.SelectorFunctionKeys<T, ?> selectorKeys = (Keys.SelectorFunctionKeys<T, ?>) pKeys;
 				return translateSelectorFunctionPartitioner(selectorKeys, pMethod, name, input, getParallelism(),
@@ -179,8 +175,8 @@ public class PartitionOperator<T> extends SingleInputOperator<T, T, PartitionOpe
 			else {
 				throw new UnsupportedOperationException("Unrecognized key type.");
 			}
-			
-		} 
+
+		}
 		else {
 			throw new UnsupportedOperationException("Unsupported partitioning method: " + pMethod.name());
 		}
@@ -217,8 +213,7 @@ public class PartitionOperator<T> extends SingleInputOperator<T, T, PartitionOpe
 		Operator<T> input,
 		int partitionDop,
 		Partitioner<?> customPartitioner,
-		Order[] orders)
-	{
+		Order[] orders) {
 		final SelectorFunctionKeys<T, K> keys = (SelectorFunctionKeys<T, K>) rawKeys;
 		TypeInformation<Tuple2<K, T>> typeInfoWithKey = KeyFunctions.createTypeWithKey(keys);
 
@@ -234,5 +229,4 @@ public class PartitionOperator<T> extends SingleInputOperator<T, T, PartitionOpe
 		return KeyFunctions.appendKeyRemover(keyedPartitionedInput, keys);
 	}
 
-	
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
index 0068582..d8cc916 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.api.java.operators;
 
-import java.util.Arrays;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
@@ -31,34 +29,58 @@ import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
 import org.apache.flink.api.java.operators.translation.PlanProjectOperator;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple10;
+import org.apache.flink.api.java.tuple.Tuple11;
+import org.apache.flink.api.java.tuple.Tuple12;
+import org.apache.flink.api.java.tuple.Tuple13;
+import org.apache.flink.api.java.tuple.Tuple14;
+import org.apache.flink.api.java.tuple.Tuple15;
+import org.apache.flink.api.java.tuple.Tuple16;
+import org.apache.flink.api.java.tuple.Tuple17;
+import org.apache.flink.api.java.tuple.Tuple18;
+import org.apache.flink.api.java.tuple.Tuple19;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple20;
+import org.apache.flink.api.java.tuple.Tuple21;
+import org.apache.flink.api.java.tuple.Tuple22;
+import org.apache.flink.api.java.tuple.Tuple23;
+import org.apache.flink.api.java.tuple.Tuple24;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.tuple.Tuple9;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.util.Preconditions;
 
-//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
-import org.apache.flink.api.java.tuple.*;
-//CHECKSTYLE.ON: AvoidStarImport
+import java.util.Arrays;
 
 /**
  * This operator represents the application of a projection operation on a data set, and the
  * result data set produced by the function.
- * 
+ *
  * @param <IN> The type of the data set projected by the operator.
  * @param <OUT> The type of data set that is the result of the projection.
  */
 @Public
-public class ProjectOperator<IN, OUT extends Tuple> 
+public class ProjectOperator<IN, OUT extends Tuple>
 	extends SingleInputOperator<IN, OUT, ProjectOperator<IN, OUT>> {
-	
+
 	protected final int[] fields;
 
 	public ProjectOperator(DataSet<IN> input, int[] fields, TupleTypeInfo<OUT> returnType) {
 		super(input, returnType);
-	
+
 		this.fields = fields;
 	}
 
 	@Override
-	protected org.apache.flink.api.common.operators.base.MapOperatorBase<IN, OUT, MapFunction<IN,OUT>> translateToDataFlow(Operator<IN> input) {
+	protected org.apache.flink.api.common.operators.base.MapOperatorBase<IN, OUT, MapFunction<IN, OUT>> translateToDataFlow(Operator<IN> input) {
 		String name = getName() != null ? getName() : "Projection " + Arrays.toString(fields);
 		// create operator
 		PlanProjectOperator<IN, OUT> ppo = new PlanProjectOperator<IN, OUT>(fields, name, getInputType(), getResultType(), context.getConfig());
@@ -70,6 +92,7 @@ public class ProjectOperator<IN, OUT extends Tuple>
 
 		return ppo;
 	}
+
 	/**
 	 * @deprecated Deprecated method only kept for compatibility.
 	 */
@@ -77,61 +100,64 @@ public class ProjectOperator<IN, OUT extends Tuple>
 	@Deprecated
 	@PublicEvolving
 	public <R extends Tuple> ProjectOperator<IN, R> types(Class<?>... types) {
-		TupleTypeInfo<R> typeInfo = (TupleTypeInfo<R>)this.getResultType();
+		TupleTypeInfo<R> typeInfo = (TupleTypeInfo<R>) this.getResultType();
 
-		if(types.length != typeInfo.getArity()) {
+		if (types.length != typeInfo.getArity()) {
 			throw new InvalidProgramException("Provided types do not match projection.");
 		}
-		for (int i=0; i<types.length; i++) {
+		for (int i = 0; i < types.length; i++) {
 			Class<?> typeClass = types[i];
 			if (!typeClass.equals(typeInfo.getTypeAt(i).getTypeClass())) {
-				throw new InvalidProgramException("Provided type "+typeClass.getSimpleName()+" at position "+i+" does not match projection");
+				throw new InvalidProgramException("Provided type " + typeClass.getSimpleName() + " at position " + i + " does not match projection");
 			}
 		}
 		return (ProjectOperator<IN, R>) this;
 	}
 
+	/**
+	 * A projection of {@link DataSet}.
+	 *
+	 * @param <T>
+	 */
 	@Internal
 	public static class Projection<T> {
-		
+
 		private final DataSet<T> ds;
 		private int[] fieldIndexes;
-		
+
 		public Projection(DataSet<T> ds, int[] fieldIndexes) {
-			
-			if(!(ds.getType() instanceof TupleTypeInfo)) {
+
+			if (!(ds.getType() instanceof TupleTypeInfo)) {
 				throw new UnsupportedOperationException("project() can only be applied to DataSets of Tuples.");
 			}
-			
-			if(fieldIndexes.length == 0) {
+
+			if (fieldIndexes.length == 0) {
 				throw new IllegalArgumentException("project() needs to select at least one (1) field.");
-			} else if(fieldIndexes.length > Tuple.MAX_ARITY - 1) {
+			} else if (fieldIndexes.length > Tuple.MAX_ARITY - 1) {
 				throw new IllegalArgumentException(
-						"project() may select only up to (" + (Tuple.MAX_ARITY - 1) + ") fields.");
+					"project() may select only up to (" + (Tuple.MAX_ARITY - 1) + ") fields.");
 			}
-			
+
 			int maxFieldIndex = ds.getType().getArity();
 			for (int fieldIndexe : fieldIndexes) {
 				Preconditions.checkElementIndex(fieldIndexe, maxFieldIndex);
 			}
-			
+
 			this.ds = ds;
 			this.fieldIndexes = fieldIndexes;
 		}
-		
-		
-		// --------------------------------------------------------------------------------------------	
+
+		// --------------------------------------------------------------------------------------------
 		// The following lines are generated.
-		// --------------------------------------------------------------------------------------------	
-		// BEGIN_OF_TUPLE_DEPENDENT_CODE	
-	// GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator.
+		// --------------------------------------------------------------------------------------------
+		// BEGIN_OF_TUPLE_DEPENDENT_CODE
+		// GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator.
 
 		/**
 		 * Chooses a projectTupleX according to the length of
-		 * {@link org.apache.flink.api.java.operators.ProjectOperator.Projection#fieldIndexes} 
-		 * 
+		 * {@link org.apache.flink.api.java.operators.ProjectOperator.Projection#fieldIndexes}.
+		 *
 		 * @return The projected DataSet.
-		 * 
 		 * @see org.apache.flink.api.java.operators.ProjectOperator.Projection
 		 */
 		@SuppressWarnings("unchecked")
@@ -171,10 +197,9 @@ public class ProjectOperator<IN, OUT extends Tuple>
 		}
 
 		/**
-		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields. 
-		 * 
+		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields.
+		 *
 		 * @return The projected DataSet.
-		 * 
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -186,10 +211,9 @@ public class ProjectOperator<IN, OUT extends Tuple>
 		}
 
 		/**
-		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields. 
-		 * 
+		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields.
+		 *
 		 * @return The projected DataSet.
-		 * 
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -201,10 +225,9 @@ public class ProjectOperator<IN, OUT extends Tuple>
 		}
 
 		/**
-		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields. 
-		 * 
+		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields.
+		 *
 		 * @return The projected DataSet.
-		 * 
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -216,10 +239,9 @@ public class ProjectOperator<IN, OUT extends Tuple>
 		}
 
 		/**
-		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields. 
-		 * 
+		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields.
+		 *
 		 * @return The projected DataSet.
-		 * 
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -231,10 +253,9 @@ public class ProjectOperator<IN, OUT extends Tuple>
 		}
 
 		/**
-		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields. 
-		 * 
+		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields.
+		 *
 		 * @return The projected DataSet.
-		 * 
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -246,10 +267,9 @@ public class ProjectOperator<IN, OUT extends Tuple>
 		}
 
 		/**
-		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields. 
-		 * 
+		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields.
+		 *
 		 * @return The projected DataSet.
-		 * 
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -261,10 +281,9 @@ public class ProjectOperator<IN, OUT extends Tuple>
 		}
 
 		/**
-		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields. 
-		 * 
+		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields.
+		 *
 		 * @return The projected DataSet.
-		 * 
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -276,10 +295,9 @@ public class ProjectOperator<IN, OUT extends Tuple>
 		}
 
 		/**
-		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields. 
-		 * 
+		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields.
+		 *
 		 * @return The projected DataSet.
-		 * 
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -291,10 +309,9 @@ public class ProjectOperator<IN, OUT extends Tuple>
 		}
 
 		/**
-		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields. 
-		 * 
+		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields.
+		 *
 		 * @return The projected DataSet.
-		 * 
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -306,10 +323,9 @@ public class ProjectOperator<IN, OUT extends Tuple>
 		}
 
 		/**
-		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields. 
-		 * 
+		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields.
+		 *
 		 * @return The projected DataSet.
-		 * 
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -321,10 +337,9 @@ public class ProjectOperator<IN, OUT extends Tuple>
 		}
 
 		/**
-		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields. 
-		 * 
+		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields.
+		 *
 		 * @return The projected DataSet.
-		 * 
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -336,10 +351,9 @@ public class ProjectOperator<IN, OUT extends Tuple>
 		}
 
 		/**
-		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields. 
-		 * 
+		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields.
+		 *
 		 * @return The projected DataSet.
-		 * 
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -351,10 +365,9 @@ public class ProjectOperator<IN, OUT extends Tuple>
 		}
 
 		/**
-		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields. 
-		 * 
+		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields.
+		 *
 		 * @return The projected DataSet.
-		 * 
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -366,10 +379,9 @@ public class ProjectOperator<IN, OUT extends Tuple>
 		}
 
 		/**
-		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields. 
-		 * 
+		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields.
+		 *
 		 * @return The projected DataSet.
-		 * 
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -381,10 +393,9 @@ public class ProjectOperator<IN, OUT extends Tuple>
 		}
 
 		/**
-		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields. 
-		 * 
+		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields.
+		 *
 		 * @return The projected DataSet.
-		 * 
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -396,10 +407,9 @@ public class ProjectOperator<IN, OUT extends Tuple>
 		}
 
 		/**
-		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields. 
-		 * 
+		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields.
+		 *
 		 * @return The projected DataSet.
-		 * 
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -411,10 +421,9 @@ public class ProjectOperator<IN, OUT extends Tuple>
 		}
 
 		/**
-		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields. 
-		 * 
+		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields.
+		 *
 		 * @return The projected DataSet.
-		 * 
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -426,10 +435,9 @@ public class ProjectOperator<IN, OUT extends Tuple>
 		}
 
 		/**
-		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields. 
-		 * 
+		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields.
+		 *
 		 * @return The projected DataSet.
-		 * 
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -441,10 +449,9 @@ public class ProjectOperator<IN, OUT extends Tuple>
 		}
 
 		/**
-		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields. 
-		 * 
+		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields.
+		 *
 		 * @return The projected DataSet.
-		 * 
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -456,10 +463,9 @@ public class ProjectOperator<IN, OUT extends Tuple>
 		}
 
 		/**
-		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields. 
-		 * 
+		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields.
+		 *
 		 * @return The projected DataSet.
-		 * 
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -471,10 +477,9 @@ public class ProjectOperator<IN, OUT extends Tuple>
 		}
 
 		/**
-		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields. 
-		 * 
+		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields.
+		 *
 		 * @return The projected DataSet.
-		 * 
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -486,10 +491,9 @@ public class ProjectOperator<IN, OUT extends Tuple>
 		}
 
 		/**
-		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields. 
-		 * 
+		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields.
+		 *
 		 * @return The projected DataSet.
-		 * 
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -501,10 +505,9 @@ public class ProjectOperator<IN, OUT extends Tuple>
 		}
 
 		/**
-		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields. 
-		 * 
+		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields.
+		 *
 		 * @return The projected DataSet.
-		 * 
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -516,10 +519,9 @@ public class ProjectOperator<IN, OUT extends Tuple>
 		}
 
 		/**
-		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields. 
-		 * 
+		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields.
+		 *
 		 * @return The projected DataSet.
-		 * 
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -531,10 +533,9 @@ public class ProjectOperator<IN, OUT extends Tuple>
 		}
 
 		/**
-		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields. 
-		 * 
+		 * Projects a {@link Tuple} {@link DataSet} to the previously selected fields.
+		 *
 		 * @return The projected DataSet.
-		 * 
 		 * @see Tuple
 		 * @see DataSet
 		 */
@@ -547,20 +548,18 @@ public class ProjectOperator<IN, OUT extends Tuple>
 
 		// END_OF_TUPLE_DEPENDENT_CODE
 		// -----------------------------------------------------------------------------------------
-		
-		
-		
+
 		private TypeInformation<?>[] extractFieldTypes(int[] fields, TypeInformation<?> inType) {
-			
+
 			TupleTypeInfo<?> inTupleType = (TupleTypeInfo<?>) inType;
 			TypeInformation<?>[] fieldTypes = new TypeInformation[fields.length];
-					
-			for(int i=0; i<fields.length; i++) {					
+
+			for (int i = 0; i < fields.length; i++) {
 				fieldTypes[i] = inTupleType.getTypeAt(fields[i]);
 			}
-			
+
 			return fieldTypes;
 		}
-		
+
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
index 42dcf05..6d0c58b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
@@ -23,58 +23,56 @@ import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
 import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.SemanticPropUtil;
-import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.DataSet;
 
 /**
  * This operator represents the application of a "reduce" function on a data set, and the
  * result data set produced by the function.
- * 
+ *
  * @param <IN> The type of the data set reduced by the operator.
- * 
+ *
  * @see org.apache.flink.api.common.functions.ReduceFunction
  */
 @Public
 public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOperator<IN>> {
-	
+
 	private final ReduceFunction<IN> function;
-	
+
 	private final Grouping<IN> grouper;
-	
+
 	private final String defaultName;
 
 	// should be null in case of an all reduce
 	private CombineHint hint;
-	
+
 	/**
-	 * 
 	 * This is the case for a reduce-all case (in contrast to the reduce-per-group case).
-	 * 
+	 *
 	 * @param input
 	 * @param function
 	 */
 	public ReduceOperator(DataSet<IN> input, ReduceFunction<IN> function, String defaultName) {
 		super(input, input.getType());
-		
+
 		this.function = function;
 		this.grouper = null;
 		this.defaultName = defaultName;
 		this.hint = null;
 	}
-	
-	
+
 	public ReduceOperator(Grouping<IN> input, ReduceFunction<IN> function, String defaultName) {
 		super(input.getInputDataSet(), input.getInputDataSet().getType());
-		
+
 		this.function = function;
 		this.grouper = input;
 		this.defaultName = defaultName;
@@ -82,7 +80,7 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
 
 		UdfOperatorUtils.analyzeSingleInputUdf(this, ReduceFunction.class, defaultName, function, grouper.keys);
 	}
-	
+
 	@Override
 	protected ReduceFunction<IN> getFunction() {
 		return function;
@@ -95,12 +93,12 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
 		SingleInputSemanticProperties props = super.getSemanticProperties();
 
 		// offset semantic information by extracted key fields
-		if(props != null &&
+		if (props != null &&
 				this.grouper != null &&
 				this.grouper.keys instanceof SelectorFunctionKeys) {
 
-			int offset = ((SelectorFunctionKeys<?,?>) this.grouper.keys).getKeyType().getTotalFields();
-			if(this.grouper instanceof SortedGrouping) {
+			int offset = ((SelectorFunctionKeys<?, ?>) this.grouper.keys).getKeyType().getTotalFields();
+			if (this.grouper instanceof SortedGrouping) {
 				offset += ((SortedGrouping<?>) this.grouper).getSortSelectionFunctionKey().getKeyType().getTotalFields();
 			}
 			props = SemanticPropUtil.addSourceFieldOffset(props, this.getInputType().getTotalFields(), offset);
@@ -111,25 +109,25 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
 
 	@Override
 	protected org.apache.flink.api.common.operators.SingleInputOperator<?, IN, ?> translateToDataFlow(Operator<IN> input) {
-		
-		String name = getName() != null ? getName() : "Reduce at "+defaultName;
-		
+
+		String name = getName() != null ? getName() : "Reduce at " + defaultName;
+
 		// distinguish between grouped reduce and non-grouped reduce
 		if (grouper == null) {
 			// non grouped reduce
 			UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<>(getInputType(), getInputType());
 			ReduceOperatorBase<IN, ReduceFunction<IN>> po =
 					new ReduceOperatorBase<>(function, operatorInfo, new int[0], name);
-			
+
 			po.setInput(input);
 			// the parallelism for a non grouped reduce can only be 1
 			po.setParallelism(1);
-			
+
 			return po;
 		}
-		
+
 		if (grouper.getKeys() instanceof SelectorFunctionKeys) {
-			
+
 			// reduce with key selector function
 			@SuppressWarnings("unchecked")
 			SelectorFunctionKeys<IN, ?> selectorKeys = (SelectorFunctionKeys<IN, ?>) grouper.getKeys();
@@ -141,19 +139,19 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
 			return po;
 		}
 		else if (grouper.getKeys() instanceof Keys.ExpressionKeys) {
-			
+
 			// reduce with field positions
 			int[] logicalKeyPositions = grouper.getKeys().computeLogicalKeyPositions();
 			UnaryOperatorInformation<IN, IN> operatorInfo = new UnaryOperatorInformation<>(getInputType(), getInputType());
 			ReduceOperatorBase<IN, ReduceFunction<IN>> po =
 					new ReduceOperatorBase<>(function, operatorInfo, logicalKeyPositions, name);
-			
+
 			po.setCustomPartitioner(grouper.getCustomPartitioner());
-			
+
 			po.setInput(input);
 			po.setParallelism(getParallelism());
 			po.setCombineHint(hint);
-			
+
 			return po;
 		}
 		else {
@@ -164,7 +162,7 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
 	/**
 	 * Sets the strategy to use for the combine phase of the reduce.
 	 *
-	 * If this method is not called, then the default hint will be used.
+	 * <p>If this method is not called, then the default hint will be used.
 	 * ({@link org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint#OPTIMIZER_CHOOSES})
 	 *
 	 * @param strategy The hint to use.
@@ -177,7 +175,7 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
 	}
 
 	// --------------------------------------------------------------------------------------------
-	
+
 	private static <T, K> org.apache.flink.api.common.operators.SingleInputOperator<?, T, ?> translateSelectorFunctionReducer(
 		SelectorFunctionKeys<T, ?> rawKeys,
 		ReduceFunction<T> function,
@@ -185,14 +183,13 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
 		String name,
 		Operator<T> input,
 		int parallelism,
-		CombineHint hint)
-	{
+		CombineHint hint) {
 		@SuppressWarnings("unchecked")
 		final SelectorFunctionKeys<T, K> keys = (SelectorFunctionKeys<T, K>) rawKeys;
-		
+
 		TypeInformation<Tuple2<K, T>> typeInfoWithKey = KeyFunctions.createTypeWithKey(keys);
 		Operator<Tuple2<K, T>> keyedInput = KeyFunctions.appendKeyExtractor(input, keys);
-		
+
 		PlanUnwrappingReduceOperator<T, K> reducer = new PlanUnwrappingReduceOperator<>(function, keys, name, inputType, typeInfoWithKey);
 		reducer.setInput(keyedInput);
 		reducer.setParallelism(parallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputOperator.java
index 359f4b8..b718a56 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputOperator.java
@@ -24,47 +24,46 @@ import org.apache.flink.api.java.DataSet;
 
 /**
  * Base class for operations that operates on a single input data set.
- * 
+ *
  * @param <IN> The data type of the input data set.
  * @param <OUT> The data type of the returned data set.
  */
 @Public
 public abstract class SingleInputOperator<IN, OUT, O extends SingleInputOperator<IN, OUT, O>> extends Operator<OUT, O> {
-	
+
 	private final DataSet<IN> input;
-	
-	
+
 	protected SingleInputOperator(DataSet<IN> input, TypeInformation<OUT> resultType) {
 		super(input.getExecutionEnvironment(), resultType);
 		this.input = input;
 	}
-	
+
 	/**
 	 * Gets the data set that this operation uses as its input.
-	 * 
+	 *
 	 * @return The data set that this operation uses as its input.
 	 */
 	public DataSet<IN> getInput() {
 		return this.input;
 	}
-	
+
 	/**
 	 * Gets the type information of the data type of the input data set.
 	 * This method returns equivalent information as {@code getInput().getType()}.
-	 * 
+	 *
 	 * @return The input data type.
 	 */
 	public TypeInformation<IN> getInputType() {
 		return this.input.getType();
 	}
-	
+
 	/**
 	 * Translates this operation to a data flow operator of the common data flow API.
-	 * 
+	 *
 	 * @param input The data flow operator that produces this operation's input data.
 	 * @return The translated data flow operator.
 	 */
 	protected abstract org.apache.flink.api.common.operators.Operator<OUT> translateToDataFlow(
 			org.apache.flink.api.common.operators.Operator<IN> input);
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
index eb485fe..4ce44aa 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SingleInputUdfOperator.java
@@ -18,12 +18,6 @@
 
 package org.apache.flink.api.java.operators;
 
-import java.lang.annotation.Annotation;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
@@ -39,6 +33,12 @@ import org.apache.flink.api.java.functions.SemanticPropUtil;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
 
+import java.lang.annotation.Annotation;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
 import static java.util.Objects.requireNonNull;
 
 /**
@@ -46,16 +46,15 @@ import static java.util.Objects.requireNonNull;
  * user-defined functions (UDFs). The UDFs encapsulated by this operator are naturally UDFs that
  * have one input (such as {@link org.apache.flink.api.common.functions.RichMapFunction} or
  * {@link org.apache.flink.api.common.functions.RichReduceFunction}).
- * <p>
- * This class encapsulates utilities for the UDFs, such as broadcast variables, parameterization
+ *
+ * <p>This class encapsulates utilities for the UDFs, such as broadcast variables, parameterization
  * through configuration objects, and semantic properties.
  * @param <IN> The data type of the input data set.
  * @param <OUT> The data type of the returned data set.
  */
 @Public
 public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOperator<IN, OUT, O>>
-	extends SingleInputOperator<IN, OUT, O> implements UdfOperator<O>
-{
+	extends SingleInputOperator<IN, OUT, O> implements UdfOperator<O> {
 	private Configuration parameters;
 
 	private Map<String, DataSet<?>> broadcastVariables;
@@ -77,8 +76,7 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
 	protected SingleInputUdfOperator(DataSet<IN> input, TypeInformation<OUT> resultType) {
 		super(input, resultType);
 	}
-	
-	
+
 	protected abstract Function getFunction();
 
 	// --------------------------------------------------------------------------------------------
@@ -102,7 +100,7 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
 		if (name == null) {
 			throw new IllegalArgumentException("Broadcast variable name must not be null.");
 		}
-		
+
 		if (this.broadcastVariables == null) {
 			this.broadcastVariables = new HashMap<String, DataSet<?>>();
 		}
@@ -115,46 +113,34 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
 	}
 
 	/**
-	 * <p>
 	 * Adds semantic information about forwarded fields of the user-defined function.
 	 * The forwarded fields information declares fields which are never modified by the function and
 	 * which are forwarded at the same position to the output or unchanged copied to another position in the output.
-	 * </p>
 	 *
-	 * <p>
-	 * Fields that are forwarded at the same position are specified by their position.
+	 * <p>Fields that are forwarded at the same position are specified by their position.
 	 * The specified position must be valid for the input and output data type and have the same type.
 	 * For example <code>withForwardedFields("f2")</code> declares that the third field of a Java input tuple is
 	 * copied to the third field of an output tuple.
-	 * </p>
 	 *
-	 * <p>
-	 * Fields which are unchanged copied to another position in the output are declared by specifying the
+	 * <p>Fields which are unchanged copied to another position in the output are declared by specifying the
 	 * source field reference in the input and the target field reference in the output.
 	 * {@code withForwardedFields("f0->f2")} denotes that the first field of the Java input tuple is
 	 * unchanged copied to the third field of the Java output tuple. When using a wildcard ("*") ensure that
 	 * the number of declared fields and their types in input and output type match.
-	 * </p>
 	 *
-	 * <p>
-	 * Multiple forwarded fields can be annotated in one ({@code withForwardedFields("f2; f3->f0; f4")})
+	 * <p>Multiple forwarded fields can be annotated in one ({@code withForwardedFields("f2; f3->f0; f4")})
 	 * or separate Strings ({@code withForwardedFields("f2", "f3->f0", "f4")}).
 	 * Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for
 	 * details on field references such as nested fields and wildcard.
-	 * </p>
 	 *
-	 * <p>
-	 * It is not possible to override existing semantic information about forwarded fields which was
+	 * <p>It is not possible to override existing semantic information about forwarded fields which was
 	 * for example added by a {@link org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields} class annotation.
-	 * </p>
 	 *
-	 * <p>
-	 * <b>NOTE: Adding semantic information for functions is optional!
+	 * <p><b>NOTE: Adding semantic information for functions is optional!
 	 * If used correctly, semantic information can help the Flink optimizer to generate more efficient execution plans.
 	 * However, incorrect semantic information can cause the optimizer to generate incorrect execution plans which compute wrong results!
 	 * So be careful when adding semantic information.
 	 * </b>
-	 * </p>
 	 *
 	 * @param forwardedFields A list of field forward expressions.
 	 * @return This operator with annotated forwarded field information.
@@ -164,17 +150,17 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
 	 */
 	public O withForwardedFields(String... forwardedFields) {
 
-		if(this.udfSemantics == null) {
+		if (this.udfSemantics == null) {
 			// extract semantic properties from function annotations
 			setSemanticProperties(extractSemanticAnnotations(getFunction().getClass()));
 		}
 
-		if(this.udfSemantics == null
+		if (this.udfSemantics == null
 				|| this.analyzedUdfSemantics) { // discard analyzed semantic properties
 			setSemanticProperties(new SingleInputSemanticProperties());
 			SemanticPropUtil.getSemanticPropsSingleFromString(this.udfSemantics, forwardedFields, null, null, this.getInputType(), this.getResultType());
 		} else {
-			if(udfWithForwardedFieldsAnnotation(getFunction().getClass())) {
+			if (udfWithForwardedFieldsAnnotation(getFunction().getClass())) {
 				// refuse semantic information as it would override the function annotation
 				throw new SemanticProperties.InvalidSemanticAnnotationException("Forwarded field information " +
 						"has already been added by a function annotation for this operator. " +
@@ -202,7 +188,7 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
 	 * <p>Classes can be used as type hints for non-generic types (classes without generic parameters),
 	 * but not for generic types like for example Tuples. For those generic types, please
 	 * use the {@link #returns(TypeHint)} method.
-	 * 
+	 *
 	 * <p>Use this method the following way:
 	 * <pre>{@code
 	 *     DataSet<String[]> result =
@@ -215,7 +201,7 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
 	 */
 	public O returns(Class<OUT> typeClass) {
 		requireNonNull(typeClass, "type class must not be null");
-		
+
 		try {
 			return returns(TypeInformation.of(typeClass));
 		}
@@ -225,7 +211,7 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
 					"please use the 'returns(TypeHint)' method instead.", e);
 		}
 	}
-	
+
 	/**
 	 * Adds a type information hint about the return type of this operator. This method
 	 * can be used in cases where Flink cannot determine automatically what the produced
@@ -244,7 +230,7 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
 	 */
 	public O returns(TypeHint<OUT> typeHint) {
 		requireNonNull(typeHint, "TypeHint must not be null");
-	
+
 		try {
 			return returns(TypeInformation.of(typeHint));
 		}
@@ -259,7 +245,7 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
 	 * can be used in cases where Flink cannot determine automatically what the produced
 	 * type of a function is. That can be the case if the function uses generic type variables
 	 * in the return type that cannot be inferred from the input type.
-	 * 
+	 *
 	 * <p>In most cases, the methods {@link #returns(Class)} and {@link #returns(TypeHint)}
 	 * are preferable.
 	 *
@@ -268,22 +254,22 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
 	 */
 	public O returns(TypeInformation<OUT> typeInfo) {
 		requireNonNull(typeInfo, "TypeInformation must not be null");
-		
+
 		fillInType(typeInfo);
 		@SuppressWarnings("unchecked")
 		O returnType = (O) this;
 		return returnType;
 	}
-	
+
 	/**
-	 * Adds a type information hint about the return type of this operator. 
-	 * 
-	 * <p>
-	 * Type hints are important in cases where the Java compiler
+	 * Adds a type information hint about the return type of this operator.
+	 *
+	 *
+	 * <p>Type hints are important in cases where the Java compiler
 	 * throws away generic type information necessary for efficient execution.
-	 * 
-	 * <p>
-	 * This method takes a type information string that will be parsed. A type information string can contain the following
+	 *
+	 *
+	 * <p>This method takes a type information string that will be parsed. A type information string can contain the following
 	 * types:
 	 *
 	 * <ul>
@@ -303,13 +289,13 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
 	 * <li>Enum types such as <code>Enum&lt;org.my.CustomEnum&gt;</code></li>
 	 * </ul>
 	 *
-	 * Example:
+	 * <p>Example:
 	 * <code>"Tuple2&lt;String,Tuple2&lt;Integer,org.my.MyJob$Pojo&lt;word=String&gt;&gt;&gt;"</code>
 	 *
 	 * @param typeInfoString
 	 *            type information string to be parsed
 	 * @return This operator with a given return type hint.
-	 * 
+	 *
 	 * @deprecated Please use {@link #returns(Class)} or {@link #returns(TypeHint)} instead.
 	 */
 	@Deprecated
@@ -320,7 +306,7 @@ public abstract class SingleInputUdfOperator<IN, OUT, O extends SingleInputUdfOp
 		}
 		return returns(TypeInfoParser.<OUT>parse(typeInfoString));
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	// Accessors
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java
index 7f30a30..862ad89 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortPartitionOperator.java
@@ -58,7 +58,6 @@ public class SortPartitionOperator<T> extends SingleInputOperator<T, T, SortPart
 		this.sortLocationName = sortLocationName;
 	}
 
-
 	public SortPartitionOperator(DataSet<T> dataSet, int sortField, Order sortOrder, String sortLocationName) {
 		this(dataSet, sortLocationName);
 		this.useKeySelector = false;

http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
index 11645df..2e3709d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java
@@ -19,40 +19,40 @@
 package org.apache.flink.api.java.operators;
 
 import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.FirstReducer;
-
-import java.util.Arrays;
-
-import org.apache.flink.api.common.InvalidProgramException;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.util.Preconditions;
 
+import java.util.Arrays;
+
 /**
- * SortedGrouping is an intermediate step for a transformation on a grouped and sorted DataSet.<br>
- * The following transformation can be applied on sorted groups:
+ * SortedGrouping is an intermediate step for a transformation on a grouped and sorted DataSet.
+ *
+ * <p>The following transformation can be applied on sorted groups:
  * <ul>
  * 	<li>{@link SortedGrouping#reduceGroup(org.apache.flink.api.common.functions.GroupReduceFunction)},</li>
  * </ul>
- * 
+ *
  * @param <T> The type of the elements of the sorted and grouped DataSet.
  */
 @Public
 public class SortedGrouping<T> extends Grouping<T> {
-	
+
 	private int[] groupSortKeyPositions;
 	private Order[] groupSortOrders;
 	private Keys.SelectorFunctionKeys<T, ?> groupSortSelectorFunctionKey = null;
-	
+
 	/*
 	 * int sorting keys for tuples
 	 */
@@ -70,7 +70,7 @@ public class SortedGrouping<T> extends Grouping<T> {
 		this.groupSortOrders = new Order[groupSortKeyPositions.length];
 		Arrays.fill(this.groupSortOrders, order);
 	}
-	
+
 	/*
 	 * String sorting for Pojos and tuples
 	 */
@@ -99,8 +99,8 @@ public class SortedGrouping<T> extends Grouping<T> {
 			throw new InvalidProgramException("Sorting on KeySelector keys only works with KeySelector grouping.");
 		}
 		TypeInformation<?> sortKeyType = keySelector.getKeyType();
-		if(!sortKeyType.isSortKeyType()) {
-			throw new InvalidProgramException("Key type " + sortKeyType +" is not sortable.");
+		if (!sortKeyType.isSortKeyType()) {
+			throw new InvalidProgramException("Key type " + sortKeyType + " is not sortable.");
 		}
 
 		this.groupSortKeyPositions = keySelector.computeLogicalKeyPositions();
@@ -112,13 +112,13 @@ public class SortedGrouping<T> extends Grouping<T> {
 		this.groupSortOrders = new Order[groupSortKeyPositions.length];
 		Arrays.fill(this.groupSortOrders, order);
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	protected int[] getGroupSortKeyPositions() {
 		return this.groupSortKeyPositions;
 	}
-	
+
 	protected Order[] getGroupSortOrders() {
 		return this.groupSortOrders;
 	}
@@ -126,24 +126,24 @@ public class SortedGrouping<T> extends Grouping<T> {
 	protected Ordering getGroupOrdering() {
 
 		Ordering o = new Ordering();
-		for(int i=0; i < this.groupSortKeyPositions.length; i++) {
+		for (int i = 0; i < this.groupSortKeyPositions.length; i++) {
 			o.appendOrdering(this.groupSortKeyPositions[i], null, this.groupSortOrders[i]);
 		}
 
 		return o;
 	}
-	
+
 	/**
 	 * Uses a custom partitioner for the grouping.
-	 * 
+	 *
 	 * @param partitioner The custom partitioner.
 	 * @return The grouping object itself, to allow for method chaining.
 	 */
 	public SortedGrouping<T> withPartitioner(Partitioner<?> partitioner) {
 		Preconditions.checkNotNull(partitioner);
-		
+
 		getKeys().validateCustomPartitioner(partitioner, null);
-		
+
 		this.customPartitioner = partitioner;
 		return this;
 	}
@@ -153,14 +153,15 @@ public class SortedGrouping<T> extends Grouping<T> {
 	}
 
 	/**
-	 * Applies a GroupReduce transformation on a grouped and sorted {@link DataSet}.<br>
-	 * The transformation calls a {@link org.apache.flink.api.common.functions.RichGroupReduceFunction} for each group of the DataSet.
+	 * Applies a GroupReduce transformation on a grouped and sorted {@link DataSet}.
+	 *
+	 * <p>The transformation calls a {@link org.apache.flink.api.common.functions.RichGroupReduceFunction} for each group of the DataSet.
 	 * A GroupReduceFunction can iterate over all elements of a group and emit any
 	 *   number of output elements including none.
-	 * 
+	 *
 	 * @param reducer The GroupReduceFunction that is applied on each group of the DataSet.
 	 * @return A GroupReduceOperator that represents the reduced DataSet.
-	 * 
+	 *
 	 * @see org.apache.flink.api.common.functions.RichGroupReduceFunction
 	 * @see GroupReduceOperator
 	 * @see DataSet
@@ -196,33 +197,34 @@ public class SortedGrouping<T> extends Grouping<T> {
 		return new GroupCombineOperator<>(this, resultType, inputDataSet.clean(combiner), Utils.getCallLocationName());
 	}
 
-	
 	/**
-	 * Returns a new set containing the first n elements in this grouped and sorted {@link DataSet}.<br>
+	 * Returns a new set containing the first n elements in this grouped and sorted {@link DataSet}.
 	 * @param n The desired number of elements for each group.
 	 * @return A GroupReduceOperator that represents the DataSet containing the elements.
 	*/
 	public GroupReduceOperator<T, T> first(int n) {
-		if(n < 1) {
+		if (n < 1) {
 			throw new InvalidProgramException("Parameter n of first(n) must be at least 1.");
 		}
-		
+
 		return reduceGroup(new FirstReducer<T>(n));
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Group Operations
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
-	 * Sorts {@link org.apache.flink.api.java.tuple.Tuple} elements within a group on the specified field in the specified {@link Order}.<br>
-	 * <b>Note: Only groups of Tuple or Pojo elements can be sorted.</b><br>
-	 * Groups can be sorted by multiple fields by chaining {@link #sortGroup(int, Order)} calls.
-	 * 
+	 * Sorts {@link org.apache.flink.api.java.tuple.Tuple} elements within a group on the specified field in the specified {@link Order}.
+	 *
+	 * <p><b>Note: Only groups of Tuple or Pojo elements can be sorted.</b>
+	 *
+	 * <p>Groups can be sorted by multiple fields by chaining {@link #sortGroup(int, Order)} calls.
+	 *
 	 * @param field The Tuple field on which the group is sorted.
 	 * @param order The Order in which the specified Tuple field is sorted.
 	 * @return A SortedGrouping with specified order of group element.
-	 * 
+	 *
 	 * @see org.apache.flink.api.java.tuple.Tuple
 	 * @see Order
 	 */
@@ -241,9 +243,11 @@ public class SortedGrouping<T> extends Grouping<T> {
 	}
 
 	/**
-	 * Sorts {@link org.apache.flink.api.java.tuple.Tuple} or POJO elements within a group on the specified field in the specified {@link Order}.<br>
-	 * <b>Note: Only groups of Tuple or Pojo elements can be sorted.</b><br>
-	 * Groups can be sorted by multiple fields by chaining {@link #sortGroup(String, Order)} calls.
+	 * Sorts {@link org.apache.flink.api.java.tuple.Tuple} or POJO elements within a group on the specified field in the specified {@link Order}.
+	 *
+	 * <p><b>Note: Only groups of Tuple or Pojo elements can be sorted.</b>
+	 *
+	 * <p>Groups can be sorted by multiple fields by chaining {@link #sortGroup(String, Order)} calls.
 	 *
 	 * @param field The Tuple or Pojo field on which the group is sorted.
 	 * @param order The Order in which the specified field is sorted.
@@ -265,17 +269,17 @@ public class SortedGrouping<T> extends Grouping<T> {
 		addSortGroupInternal(ek, order);
 		return this;
 	}
-	
+
 	private void addSortGroupInternal(ExpressionKeys<T> ek, Order order) {
 		Preconditions.checkArgument(order != null, "Order can not be null");
 		int[] additionalKeyPositions = ek.computeLogicalKeyPositions();
-		
+
 		int newLength = this.groupSortKeyPositions.length + additionalKeyPositions.length;
 		this.groupSortKeyPositions = Arrays.copyOf(this.groupSortKeyPositions, newLength);
 		this.groupSortOrders = Arrays.copyOf(this.groupSortOrders, newLength);
 		int pos = newLength - additionalKeyPositions.length;
 		int off = newLength - additionalKeyPositions.length;
-		for(;pos < newLength; pos++) {
+		for (; pos < newLength; pos++) {
 			this.groupSortKeyPositions[pos] = additionalKeyPositions[pos - off];
 			this.groupSortOrders[pos] = order; // use the same order
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputOperator.java
index 28dec32..b915849 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputOperator.java
@@ -25,18 +25,17 @@ import org.apache.flink.util.Preconditions;
 
 /**
  * Base class for operations that operates on two input data sets.
- * 
+ *
  * @param <IN1> The data type of the first input data set.
  * @param <IN2> The data type of the second input data set.
  * @param <OUT> The data type of the returned data set.
  */
 @Public
 public abstract class TwoInputOperator<IN1, IN2, OUT, O extends TwoInputOperator<IN1, IN2, OUT, O>> extends Operator<OUT, O> {
-	
+
 	private final DataSet<IN1> input1;
 	private final DataSet<IN2> input2;
-	
-	
+
 	protected TwoInputOperator(DataSet<IN1> input1, DataSet<IN2> input2, TypeInformation<OUT> resultType) {
 		super(Preconditions.checkNotNull(input1, "input1 is null").getExecutionEnvironment(), resultType);
 		Preconditions.checkNotNull(input2, "input2 is null");
@@ -44,48 +43,48 @@ public abstract class TwoInputOperator<IN1, IN2, OUT, O extends TwoInputOperator
 		this.input1 = input1;
 		this.input2 = input2;
 	}
-	
+
 	/**
 	 * Gets the data set that this operation uses as its first input.
-	 * 
+	 *
 	 * @return The data set that this operation uses as its first input.
 	 */
 	public DataSet<IN1> getInput1() {
 		return this.input1;
 	}
-	
+
 	/**
 	 * Gets the data set that this operation uses as its second input.
-	 * 
+	 *
 	 * @return The data set that this operation uses as its second input.
 	 */
 	public DataSet<IN2> getInput2() {
 		return this.input2;
 	}
-	
+
 	/**
 	 * Gets the type information of the data type of the first input data set.
 	 * This method returns equivalent information as {@code getInput1().getType()}.
-	 * 
+	 *
 	 * @return The first input data type.
 	 */
 	public TypeInformation<IN1> getInput1Type() {
 		return this.input1.getType();
 	}
-	
+
 	/**
 	 * Gets the type information of the data type of the second input data set.
 	 * This method returns equivalent information as {@code getInput2().getType()}.
-	 * 
+	 *
 	 * @return The second input data type.
 	 */
 	public TypeInformation<IN2> getInput2Type() {
 		return this.input2.getType();
 	}
-	
+
 	/**
 	 * Translates this java API operator into a common API operator with two inputs.
-	 * 
+	 *
 	 * @param input1 The first input of the operation, as a common API operator.
 	 * @param input2 The second input of the operation, as a common API operator.
 	 * @return The created common API operator.

http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
index 695ed3a..b78d17e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/TwoInputUdfOperator.java
@@ -18,12 +18,6 @@
 
 package org.apache.flink.api.java.operators;
 
-import java.lang.annotation.Annotation;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
@@ -39,6 +33,12 @@ import org.apache.flink.api.java.functions.SemanticPropUtil;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
 
+import java.lang.annotation.Annotation;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
 import static java.util.Objects.requireNonNull;
 
 /**
@@ -46,8 +46,8 @@ import static java.util.Objects.requireNonNull;
  * user-defined functions (UDFs). The UDFs encapsulated by this operator are naturally UDFs that
  * have two inputs (such as {@link org.apache.flink.api.common.functions.RichJoinFunction} or
  * {@link org.apache.flink.api.common.functions.RichCoGroupFunction}).
- * <p>
- * This class encapsulates utilities for the UDFs, such as broadcast variables, parameterization
+ *
+ * <p>This class encapsulates utilities for the UDFs, such as broadcast variables, parameterization
  * through configuration objects, and semantic properties.
  *
  * @param <IN1> The data type of the first input data set.
@@ -56,8 +56,7 @@ import static java.util.Objects.requireNonNull;
  */
 @Public
 public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOperator<IN1, IN2, OUT, O>>
-	extends TwoInputOperator<IN1, IN2, OUT, O> implements UdfOperator<O>
-{
+	extends TwoInputOperator<IN1, IN2, OUT, O> implements UdfOperator<O> {
 	private Configuration parameters;
 
 	private Map<String, DataSet<?>> broadcastVariables;
@@ -104,7 +103,7 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
 		if (name == null) {
 			throw new IllegalArgumentException("Broadcast variable name must not be null.");
 		}
-		
+
 		if (this.broadcastVariables == null) {
 			this.broadcastVariables = new HashMap<String, DataSet<?>>();
 		}
@@ -117,46 +116,34 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
 	}
 
 	/**
-	 * <p>
 	 * Adds semantic information about forwarded fields of the first input of the user-defined function.
 	 * The forwarded fields information declares fields which are never modified by the function and
 	 * which are forwarded at the same position to the output or unchanged copied to another position in the output.
-	 * </p>
 	 *
-	 * <p>
-	 * Fields that are forwarded at the same position are specified by their position.
+	 * <p>Fields that are forwarded at the same position are specified by their position.
 	 * The specified position must be valid for the input and output data type and have the same type.
 	 * For example <code>withForwardedFieldsFirst("f2")</code> declares that the third field of a Java input tuple
 	 * from the first input is copied to the third field of an output tuple.
-	 * </p>
 	 *
-	 * <p>
-	 * Fields which are unchanged copied from the first input to another position in the output are declared
+	 * <p>Fields which are unchanged copied from the first input to another position in the output are declared
 	 * by specifying the source field reference in the first input and the target field reference in the output.
 	 * {@code withForwardedFieldsFirst("f0->f2")} denotes that the first field of the first input Java tuple is
 	 * unchanged copied to the third field of the Java output tuple. When using a wildcard ("*") ensure that
 	 * the number of declared fields and their types in first input and output type match.
-	 * </p>
 	 *
-	 * <p>
-	 * Multiple forwarded fields can be annotated in one ({@code withForwardedFieldsFirst("f2; f3->f0; f4")})
+	 * <p>Multiple forwarded fields can be annotated in one ({@code withForwardedFieldsFirst("f2; f3->f0; f4")})
 	 * or separate Strings ({@code withForwardedFieldsFirst("f2", "f3->f0", "f4")}).
 	 * Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for
 	 * details on field references such as nested fields and wildcard.
-	 * </p>
 	 *
-	 * <p>
-	 * It is not possible to override existing semantic information about forwarded fields of the first input which was
+	 * <p>It is not possible to override existing semantic information about forwarded fields of the first input which was
 	 * for example added by a {@link org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst} class annotation.
-	 * </p>
 	 *
-	 * <p>
-	 * <b>NOTE: Adding semantic information for functions is optional!
+	 * <p><b>NOTE: Adding semantic information for functions is optional!
 	 * If used correctly, semantic information can help the Flink optimizer to generate more efficient execution plans.
 	 * However, incorrect semantic information can cause the optimizer to generate incorrect execution plans which compute wrong results!
 	 * So be careful when adding semantic information.
 	 * </b>
-	 * </p>
 	 *
 	 * @param forwardedFieldsFirst A list of forwarded field expressions for the first input of the function.
 	 * @return This operator with annotated forwarded field information.
@@ -171,12 +158,12 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
 			setSemanticProperties(extractSemanticAnnotationsFromUdf(getFunction().getClass()));
 		}
 
-		if(this.udfSemantics == null || this.analyzedUdfSemantics) {
+		if (this.udfSemantics == null || this.analyzedUdfSemantics) {
 			setSemanticProperties(new DualInputSemanticProperties());
 			SemanticPropUtil.getSemanticPropsDualFromString(this.udfSemantics, forwardedFieldsFirst, null,
 					null, null, null, null, getInput1Type(), getInput2Type(), getResultType());
 		} else {
-			if(this.udfWithForwardedFieldsFirstAnnotation(getFunction().getClass())) {
+			if (this.udfWithForwardedFieldsFirstAnnotation(getFunction().getClass())) {
 				// refuse semantic information as it would override the function annotation
 				throw new SemanticProperties.InvalidSemanticAnnotationException("Forwarded field information " +
 						"has already been added by a function annotation for the first input of this operator. " +
@@ -192,46 +179,34 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
 	}
 
 	/**
-	 * <p>
 	 * Adds semantic information about forwarded fields of the second input of the user-defined function.
 	 * The forwarded fields information declares fields which are never modified by the function and
 	 * which are forwarded at the same position to the output or unchanged copied to another position in the output.
-	 * </p>
 	 *
-	 * <p>
-	 * Fields that are forwarded at the same position are specified by their position.
+	 * <p>Fields that are forwarded at the same position are specified by their position.
 	 * The specified position must be valid for the input and output data type and have the same type.
 	 * For example <code>withForwardedFieldsSecond("f2")</code> declares that the third field of a Java input tuple
 	 * from the second input is copied to the third field of an output tuple.
-	 * </p>
 	 *
-	 * <p>
-	 * Fields which are unchanged copied from the second input to another position in the output are declared
+	 * <p>Fields which are unchanged copied from the second input to another position in the output are declared
 	 * by specifying the source field reference in the second input and the target field reference in the output.
 	 * {@code withForwardedFieldsSecond("f0->f2")} denotes that the first field of the second input Java tuple is
 	 * unchanged copied to the third field of the Java output tuple. When using a wildcard ("*") ensure that
 	 * the number of declared fields and their types in second input and output type match.
-	 * </p>
 	 *
-	 * <p>
-	 * Multiple forwarded fields can be annotated in one ({@code withForwardedFieldsSecond("f2; f3->f0; f4")})
+	 * <p>Multiple forwarded fields can be annotated in one ({@code withForwardedFieldsSecond("f2; f3->f0; f4")})
 	 * or separate Strings ({@code withForwardedFieldsSecond("f2", "f3->f0", "f4")}).
 	 * Please refer to the JavaDoc of {@link org.apache.flink.api.common.functions.Function} or Flink's documentation for
 	 * details on field references such as nested fields and wildcard.
-	 * </p>
 	 *
-	 * <p>
-	 * It is not possible to override existing semantic information about forwarded fields of the second input which was
+	 * <p>It is not possible to override existing semantic information about forwarded fields of the second input which was
 	 * for example added by a {@link org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond} class annotation.
-	 * </p>
 	 *
-	 * <p>
-	 * <b>NOTE: Adding semantic information for functions is optional!
+	 * <p><b>NOTE: Adding semantic information for functions is optional!
 	 * If used correctly, semantic information can help the Flink optimizer to generate more efficient execution plans.
 	 * However, incorrect semantic information can cause the optimizer to generate incorrect execution plans which compute wrong results!
 	 * So be careful when adding semantic information.
 	 * </b>
-	 * </p>
 	 *
 	 * @param forwardedFieldsSecond A list of forwarded field expressions for the second input of the function.
 	 * @return This operator with annotated forwarded field information.
@@ -246,12 +221,12 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
 			setSemanticProperties(extractSemanticAnnotationsFromUdf(getFunction().getClass()));
 		}
 
-		if(this.udfSemantics == null || this.analyzedUdfSemantics) {
+		if (this.udfSemantics == null || this.analyzedUdfSemantics) {
 			setSemanticProperties(new DualInputSemanticProperties());
 			SemanticPropUtil.getSemanticPropsDualFromString(this.udfSemantics, null, forwardedFieldsSecond,
 					null, null, null, null, getInput1Type(), getInput2Type(), getResultType());
 		} else {
-			if(udfWithForwardedFieldsSecondAnnotation(getFunction().getClass())) {
+			if (udfWithForwardedFieldsSecondAnnotation(getFunction().getClass())) {
 				// refuse semantic information as it would override the function annotation
 				throw new SemanticProperties.InvalidSemanticAnnotationException("Forwarded field information " +
 						"has already been added by a function annotation for the second input of this operator. " +
@@ -282,7 +257,7 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
 	 *
 	 * <p>Use this method the following way:
 	 * <pre>{@code
-	 *     DataSet<String[]> result = 
+	 *     DataSet<String[]> result =
 	 *         data1.join(data2).where("id").equalTo("fieldX")
 	 *              .with(new JoinFunctionWithNonInferrableReturnType())
 	 *              .returns(String[].class);
@@ -312,7 +287,7 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
 	 *
 	 * <p>Use this method the following way:
 	 * <pre>{@code
-	 *     DataSet<Tuple2<String, Double>> result = 
+	 *     DataSet<Tuple2<String, Double>> result =
 	 *         data1.join(data2).where("id").equalTo("fieldX")
 	 *              .with(new JoinFunctionWithNonInferrableReturnType())
 	 *              .returns(new TypeHint<Tuple2<String, Double>>(){});
@@ -355,14 +330,14 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
 	}
 
 	/**
-	 * Adds a type information hint about the return type of this operator. 
+	 * Adds a type information hint about the return type of this operator.
 	 *
-	 * <p>
-	 * Type hints are important in cases where the Java compiler
+	 *
+	 * <p>Type hints are important in cases where the Java compiler
 	 * throws away generic type information necessary for efficient execution.
 	 *
-	 * <p>
-	 * This method takes a type information string that will be parsed. A type information string can contain the following
+	 *
+	 * <p>This method takes a type information string that will be parsed. A type information string can contain the following
 	 * types:
 	 *
 	 * <ul>
@@ -382,7 +357,7 @@ public abstract class TwoInputUdfOperator<IN1, IN2, OUT, O extends TwoInputUdfOp
 	 * <li>Enum types such as <code>Enum&lt;org.my.CustomEnum&gt;</code></li>
 	 * </ul>
 	 *
-	 * Example:
+	 * <p>Example:
 	 * <code>"Tuple2&lt;String,Tuple2&lt;Integer,org.my.MyJob$Pojo&lt;word=String&gt;&gt;&gt;"</code>
 	 *
 	 * @param typeInfoString

http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java
index e4f7888..249a5cb 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperator.java
@@ -18,14 +18,13 @@
 
 package org.apache.flink.api.java.operators;
 
-import java.util.Map;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.configuration.Configuration;
 
-import org.apache.flink.api.java.DataSet;
+import java.util.Map;
 
 /**
  * This interface marks operators as operators that execute user-defined functions (UDFs), such as
@@ -35,60 +34,60 @@ import org.apache.flink.api.java.DataSet;
  */
 @Public
 public interface UdfOperator<O extends UdfOperator<O>> {
-	
+
 	// --------------------------------------------------------------------------------------------
 	// Accessors
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Gets the configuration parameters that will be passed to the UDF's open method
 	 * {@link org.apache.flink.api.common.functions.AbstractRichFunction#open(Configuration)}.
 	 * The configuration is set via the {@link #withParameters(Configuration)}
 	 * method.
-	 * 
+	 *
 	 * @return The configuration parameters for the UDF.
 	 */
 	Configuration getParameters();
-	
+
 	/**
 	 * Gets the broadcast sets (name and data set) that have been added to context of the UDF.
 	 * Broadcast sets are added to a UDF via the method {@link #withBroadcastSet(DataSet, String)}.
-	 * 
+	 *
 	 * @return The broadcast data sets that have been added to this UDF.
 	 */
 	@Internal
 	Map<String, DataSet<?>> getBroadcastSets();
-	
+
 	/**
 	 * Gets the semantic properties that have been set for the user-defined functions (UDF).
-	 * 
+	 *
 	 * @return The semantic properties of the UDF.
 	 */
 	@Internal
 	SemanticProperties getSemanticProperties();
-	
+
 	// --------------------------------------------------------------------------------------------
 	// Fluent API methods
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Sets the configuration parameters for the UDF. These are optional parameters that are passed
 	 * to the UDF in the {@link org.apache.flink.api.common.functions.AbstractRichFunction#open(Configuration)} method.
-	 * 
+	 *
 	 * @param parameters The configuration parameters for the UDF.
 	 * @return The operator itself, to allow chaining function calls.
 	 */
 	O withParameters(Configuration parameters);
-	
+
 	/**
 	 * Adds a certain data set as a broadcast set to this operator. Broadcasted data sets are available at all
 	 * parallel instances of this operator. A broadcast data set is registered under a certain name, and can be
 	 * retrieved under that name from the operators runtime context via
 	 * {@link org.apache.flink.api.common.functions.RuntimeContext#getBroadcastVariable(String)}.
-	 * 
-	 * The runtime context itself is available in all UDFs via
+	 *
+	 * <p>The runtime context itself is available in all UDFs via
 	 * {@link org.apache.flink.api.common.functions.AbstractRichFunction#getRuntimeContext()}.
-	 * 
+	 *
 	 * @param data The data set to be broadcasted.
 	 * @param name The name under which the broadcast data set retrieved.
 	 * @return The operator itself, to allow chaining function calls.

http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java
index 660f845..f8e04a9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.java.functions.FunctionAnnotation;
 import org.apache.flink.api.java.sca.CodeAnalyzerException;
 import org.apache.flink.api.java.sca.UdfAnalyzer;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,7 +46,7 @@ public final class UdfOperatorUtils {
 		if (mode != CodeAnalysisMode.DISABLE
 				&& !udf.getClass().isAnnotationPresent(FunctionAnnotation.SkipCodeAnalysis.class)) {
 			final String operatorName = operator.getName() != null ? operator.getName()
-					: udfBaseClass.getSimpleName() + " at "+defaultName;
+					: udfBaseClass.getSimpleName() + " at " + defaultName;
 			try {
 				final UdfAnalyzer analyzer = new UdfAnalyzer(udfBaseClass, udf.getClass(), operatorName, operator.getInputType(), null,
 						operator.getResultType(), key, null, mode == CodeAnalysisMode.OPTIMIZE);

http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
index 70ad374..0da5e01 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
@@ -25,35 +25,35 @@ import org.apache.flink.api.common.operators.Union;
 import org.apache.flink.api.java.DataSet;
 
 /**
- * Java API operator for union of two data sets
- * 
- * @param <T> The type of the two input data sets and the result data set 
+ * Java API operator for union of two data sets.
+ *
+ * @param <T> The type of the two input data sets and the result data set
  */
 @Public
 public class UnionOperator<T> extends TwoInputOperator<T, T, T, UnionOperator<T>> {
 
 	private final String unionLocationName;
-	
+
 	/**
 	 * Create an operator that produces the union of the two given data sets.
-	 * 
+	 *
 	 * @param input1 The first data set to be unioned.
 	 * @param input2 The second data set to be unioned.
 	 */
 	public UnionOperator(DataSet<T> input1, DataSet<T> input2, String unionLocationName) {
 		super(input1, input2, input1.getType());
-		
+
 		if (!input1.getType().equals(input2.getType())) {
-			throw new InvalidProgramException("Cannot union inputs of different types. Input1=" 
+			throw new InvalidProgramException("Cannot union inputs of different types. Input1="
 					+ input1.getType() + ", input2=" + input2.getType());
 		}
-		
+
 		this.unionLocationName = unionLocationName;
 	}
-	
+
 	/**
 	 * Returns the BinaryNodeTranslation of the Union.
-	 * 
+	 *
 	 * @param input1 The first input of the union, as a common API operator.
 	 * @param input2 The second input of the union, as a common API operator.
 	 * @return The common API union operator.


Mime
View raw message