flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [4/8] flink git commit: [FLINK-7190] [java] Activate checkstyle flink-java/*
Date Tue, 25 Jul 2017 11:31:20 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/8624c290/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 3d8a384..089c90b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -18,11 +18,9 @@
 
 package org.apache.flink.api.java;
 
-import com.esotericsoftware.kryo.Serializer;
-
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
@@ -62,9 +60,9 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SplittableIterator;
 import org.apache.flink.util.Visitor;
 
+import com.esotericsoftware.kryo.Serializer;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -84,11 +82,11 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * The ExecutionEnvironment is the context in which a program is executed. A
  * {@link LocalEnvironment} will cause execution in the current JVM, a
  * {@link RemoteEnvironment} will cause execution on a remote setup.
- * <p>
- * The environment provides methods to control the job execution (such as setting the parallelism)
+ *
+ * <p>The environment provides methods to control the job execution (such as setting the parallelism)
  * and to interact with the outside world (data access).
- * <p>
- * Please note that the execution environment needs strong type information for the input and return types
+ *
+ * <p>Please note that the execution environment needs strong type information for the input and return types
  * of all operations that are executed. This means that the environments needs to know that the return
  * value of an operation is for example a Tuple of String and Integer.
  * Because the Java compiler throws much of the generic type information away, most methods attempt to re-
@@ -101,13 +99,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @Public
 public abstract class ExecutionEnvironment {
 
-	/** The logger used by the environment and its subclasses */
+	/** The logger used by the environment and its subclasses. */
 	protected static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class);
 
-	/** The environment of the context (local by default, cluster if invoked through command line) */
+	/** The environment of the context (local by default, cluster if invoked through command line). */
 	private static ExecutionEnvironmentFactory contextEnvironmentFactory;
 
-	/** The default parallelism used by local environments */
+	/** The default parallelism used by local environments. */
 	private static int defaultLocalDop = Runtime.getRuntime().availableProcessors();
 
 	// --------------------------------------------------------------------------------------------
@@ -118,20 +116,19 @@ public abstract class ExecutionEnvironment {
 
 	private final ExecutionConfig config = new ExecutionConfig();
 
-	/** Result from the latest execution, to make it retrievable when using eager execution methods */
+	/** Result from the latest execution, to make it retrievable when using eager execution methods. */
 	protected JobExecutionResult lastJobExecutionResult;
 
 	/** The ID of the session, defined by this execution environment. Sessions and Jobs are same in
-	 *  Flink, as Jobs can consist of multiple parts that are attached to the growing dataflow graph */
+	 *  Flink, as Jobs can consist of multiple parts that are attached to the growing dataflow graph. */
 	protected JobID jobID;
 
-	/** The session timeout in seconds */
+	/** The session timeout in seconds. */
 	protected long sessionTimeout;
 
-	/** Flag to indicate whether sinks have been cleared in previous executions */
+	/** Flag to indicate whether sinks have been cleared in previous executions. */
 	private boolean wasExecuted = false;
 
-
 	/**
 	 * Creates a new Execution Environment.
 	 */
@@ -171,8 +168,8 @@ public abstract class ExecutionEnvironment {
 	 * Sets the parallelism for operations executed through this environment.
 	 * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run with
 	 * x parallel instances.
-	 * <p>
-	 * This method overrides the default parallelism for this environment.
+	 *
+	 * <p>This method overrides the default parallelism for this environment.
 	 * The {@link LocalEnvironment} uses by default a value equal to the number of hardware
 	 * contexts (CPU cores / threads). When executing the program via the command line client
 	 * from a JAR file, the default parallelism is the one configured for that setup.
@@ -316,7 +313,7 @@ public abstract class ExecutionEnvironment {
 	/**
 	 * Adds a new Kryo default serializer to the Runtime.
 	 *
-	 * Note that the serializer instance must be serializable (as defined by java.io.Serializable),
+	 * <p>Note that the serializer instance must be serializable (as defined by java.io.Serializable),
 	 * because it may be distributed to the worker nodes by java serialization.
 	 *
 	 * @param type The class of the types serialized with the given serializer.
@@ -339,7 +336,7 @@ public abstract class ExecutionEnvironment {
 	/**
 	 * Registers the given type with a Kryo Serializer.
 	 *
-	 * Note that the serializer instance must be serializable (as defined by java.io.Serializable),
+	 * <p>Note that the serializer instance must be serializable (as defined by java.io.Serializable),
 	 * because it may be distributed to the worker nodes by java serialization.
 	 *
 	 * @param type The class of the types serialized with the given serializer.
@@ -350,7 +347,7 @@ public abstract class ExecutionEnvironment {
 	}
 
 	/**
-	 * Registers the given Serializer via its class as a serializer for the given type at the KryoSerializer
+	 * Registers the given Serializer via its class as a serializer for the given type at the KryoSerializer.
 	 *
 	 * @param type The class of the types serialized with the given serializer.
 	 * @param serializerClass The class of the serializer to use.
@@ -423,8 +420,8 @@ public abstract class ExecutionEnvironment {
 	 * This method is similar to {@link #readTextFile(String)}, but it produces a DataSet with mutable
 	 * {@link StringValue} objects, rather than Java Strings. StringValues can be used to tune implementations
 	 * to be less object and garbage collection heavy.
-	 * <p>
-	 * The file will be read with the system's default character set.
+	 *
+	 * <p>The file will be read with the system's default character set.
 	 *
 	 * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
 	 * @return A {@link DataSet} that represents the data read from the given file as text lines.
@@ -440,8 +437,8 @@ public abstract class ExecutionEnvironment {
 	 * This method is similar to {@link #readTextFile(String, String)}, but it produces a DataSet with mutable
 	 * {@link StringValue} objects, rather than Java Strings. StringValues can be used to tune implementations
 	 * to be less object and garbage collection heavy.
-	 * <p>
-	 * The {@link java.nio.charset.Charset} with the given name will be used to read the files.
+	 *
+	 * <p>The {@link java.nio.charset.Charset} with the given name will be used to read the files.
 	 *
 	 * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
 	 * @param charsetName The name of the character set used to read the file.
@@ -532,8 +529,8 @@ public abstract class ExecutionEnvironment {
 	 * Generic method to create an input {@link DataSet} with in {@link InputFormat}. The DataSet will not be
 	 * immediately created - instead, this method returns a DataSet that will be lazily created from
 	 * the input format once the program is executed.
-	 * <p>
-	 * Since all data sets need specific information about their types, this method needs to determine
+	 *
+	 * <p>Since all data sets need specific information about their types, this method needs to determine
 	 * the type of the data produced by the input format. It will attempt to determine the data type
 	 * by reflection, unless the input format implements the {@link ResultTypeQueryable} interface.
 	 * In the latter case, this method will invoke the {@link ResultTypeQueryable#getProducedType()}
@@ -563,8 +560,8 @@ public abstract class ExecutionEnvironment {
 	 * Generic method to create an input DataSet with in {@link InputFormat}. The {@link DataSet} will not be
 	 * immediately created - instead, this method returns a {@link DataSet} that will be lazily created from
 	 * the input format once the program is executed.
-	 * <p>
-	 * The {@link DataSet} is typed to the given TypeInformation. This method is intended for input formats that
+	 *
+	 * <p>The {@link DataSet} is typed to the given TypeInformation. This method is intended for input formats that
 	 * where the return type cannot be determined by reflection analysis, and that do not implement the
 	 * {@link ResultTypeQueryable} interface.
 	 *
@@ -590,12 +587,12 @@ public abstract class ExecutionEnvironment {
 	/**
 	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}.
 	 *
-	 * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V>, Class<K>, Class<V>, String, JobConf)}
+	 * @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V>, Class<K>, Class<V>, String, JobConf)}
 	 * from the flink-hadoop-compatibility module.
 	 */
 	@Deprecated
 	@PublicEvolving
-	public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, JobConf job) {
+	public <K, V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, JobConf job) {
 		DataSource<Tuple2<K, V>> result = createHadoopInput(mapredInputFormat, key, value, job);
 
 		org.apache.hadoop.mapred.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath));
@@ -607,12 +604,12 @@ public abstract class ExecutionEnvironment {
 	 * Creates a {@link DataSet} from {@link org.apache.hadoop.mapred.SequenceFileInputFormat}
 	 * A {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is created.
 	 *
-	 * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#readSequenceFile(Class<K>, Class<V>, String)}
+	 * @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#readSequenceFile(Class<K>, Class<V>, String)}
 	 * from the flink-hadoop-compatibility module.
 	 */
 	@Deprecated
 	@PublicEvolving
-	public <K,V> DataSource<Tuple2<K, V>> readSequenceFile(Class<K> key, Class<V> value, String inputPath) throws IOException {
+	public <K, V> DataSource<Tuple2<K, V>> readSequenceFile(Class<K> key, Class<V> value, String inputPath) throws IOException {
 		return readHadoopFile(new org.apache.hadoop.mapred.SequenceFileInputFormat<K, V>(), key, value, inputPath);
 	}
 
@@ -620,24 +617,24 @@ public abstract class ExecutionEnvironment {
 	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}. A
 	 * {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is created.
 	 *
-	 * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V>, Class<K>, Class<V>, String)}
+	 * @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V>, Class<K>, Class<V>, String)}
 	 * from the flink-hadoop-compatibility module.
 	 */
 	@Deprecated
 	@PublicEvolving
-	public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) {
+	public <K, V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) {
 		return readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf());
 	}
 
 	/**
 	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.InputFormat}.
 	 *
-	 * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V>, Class<K>, Class<V>, JobConf)}
+	 * @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V>, Class<K>, Class<V>, JobConf)}
 	 * from the flink-hadoop-compatibility module.
 	 */
 	@Deprecated
 	@PublicEvolving
-	public <K,V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
+	public <K, V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
 		HadoopInputFormat<K, V> hadoopInputFormat = new HadoopInputFormat<>(mapredInputFormat, key, value, job);
 
 		return this.createInput(hadoopInputFormat);
@@ -647,12 +644,12 @@ public abstract class ExecutionEnvironment {
 	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. The
 	 * given inputName is set on the given job.
 	 *
-	 * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V>, Class<K>, Class<V>, String, Job)}
+	 * @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V>, Class<K>, Class<V>, String, Job)}
 	 * from the flink-hadoop-compatibility module.
 	 */
 	@Deprecated
 	@PublicEvolving
-	public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws IOException {
+	public <K, V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws IOException {
 		DataSource<Tuple2<K, V>> result = createHadoopInput(mapreduceInputFormat, key, value, job);
 
 		org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new org.apache
@@ -665,45 +662,45 @@ public abstract class ExecutionEnvironment {
 	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. A
 	 * {@link org.apache.hadoop.mapreduce.Job} with the given inputPath is created.
 	 *
-	 * @deprecated Please use {@link  org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V>, Class<K>, Class<V>, String)}
+	 * @deprecated Please use {@code  org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V>, Class<K>, Class<V>, String)}
 	 * from the flink-hadoop-compatibility module.
 	 */
 	@Deprecated
 	@PublicEvolving
-	public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath) throws IOException {
+	public <K, V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath) throws IOException {
 		return readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance());
 	}
 
 	/**
 	 * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.InputFormat}.
 	 *
-	 * @deprecated Please use {@link org.apache.flink.hadoopcompatibility.HadoopInputs#createHadoopInput(org.apache.hadoop.mapreduce.InputFormat<K,V>, Class<K>, Class<V>, Job)}
+	 * @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#createHadoopInput(org.apache.hadoop.mapreduce.InputFormat<K,V>, Class<K>, Class<V>, Job)}
 	 * from the flink-hadoop-compatibility module.
 	 */
 	@Deprecated
 	@PublicEvolving
-	public <K,V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) {
+	public <K, V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) {
 		org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> hadoopInputFormat = new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<>(mapreduceInputFormat, key, value, job);
 
 		return this.createInput(hadoopInputFormat);
 	}
-	
+
 	// ----------------------------------- Collection ---------------------------------------
-	
+
 	/**
 	 * Creates a DataSet from the given non-empty collection. The type of the data set is that
 	 * of the elements in the collection.
-	 * <p>
-	 * The framework will try and determine the exact type from the collection elements.
+	 *
+	 * <p>The framework will try and determine the exact type from the collection elements.
 	 * In case of generic elements, it may be necessary to manually supply the type information
 	 * via {@link #fromCollection(Collection, TypeInformation)}.
-	 * <p>
-	 * Note that this operation will result in a non-parallel data source, i.e. a data source with
+	 *
+	 * <p>Note that this operation will result in a non-parallel data source, i.e. a data source with
 	 * a parallelism of one.
-	 * 
+	 *
 	 * @param data The collection of elements to create the data set from.
 	 * @return A DataSet representing the given collection.
-	 * 
+	 *
 	 * @see #fromCollection(Collection, TypeInformation)
 	 */
 	public <X> DataSource<X> fromCollection(Collection<X> data) {
@@ -713,86 +710,85 @@ public abstract class ExecutionEnvironment {
 		if (data.size() == 0) {
 			throw new IllegalArgumentException("The size of the collection must not be empty.");
 		}
-		
+
 		X firstValue = data.iterator().next();
-		
+
 		TypeInformation<X> type = TypeExtractor.getForObject(firstValue);
 		CollectionInputFormat.checkCollection(data, type.getTypeClass());
 		return new DataSource<>(this, new CollectionInputFormat<>(data, type.createSerializer(config)), type, Utils.getCallLocationName());
 	}
-	
+
 	/**
 	 * Creates a DataSet from the given non-empty collection. Note that this operation will result
 	 * in a non-parallel data source, i.e. a data source with a parallelism of one.
-	 * <p>
-	 * The returned DataSet is typed to the given TypeInformation.
-	 *  
+	 *
+	 * <p>The returned DataSet is typed to the given TypeInformation.
+	 *
 	 * @param data The collection of elements to create the data set from.
 	 * @param type The TypeInformation for the produced data set.
 	 * @return A DataSet representing the given collection.
-	 * 
+	 *
 	 * @see #fromCollection(Collection)
 	 */
 	public <X> DataSource<X> fromCollection(Collection<X> data, TypeInformation<X> type) {
 		return fromCollection(data, type, Utils.getCallLocationName());
 	}
-	
+
 	private <X> DataSource<X> fromCollection(Collection<X> data, TypeInformation<X> type, String callLocationName) {
 		CollectionInputFormat.checkCollection(data, type.getTypeClass());
 		return new DataSource<>(this, new CollectionInputFormat<>(data, type.createSerializer(config)), type, callLocationName);
 	}
-	
+
 	/**
 	 * Creates a DataSet from the given iterator. Because the iterator will remain unmodified until
 	 * the actual execution happens, the type of data returned by the iterator must be given
 	 * explicitly in the form of the type class (this is due to the fact that the Java compiler
 	 * erases the generic type information).
-	 * <p>
-	 * Note that this operation will result in a non-parallel data source, i.e. a data source with
+	 *
+	 * <p>Note that this operation will result in a non-parallel data source, i.e. a data source with
 	 * a parallelism of one.
-	 * 
+	 *
 	 * @param data The collection of elements to create the data set from.
 	 * @param type The class of the data produced by the iterator. Must not be a generic class.
 	 * @return A DataSet representing the elements in the iterator.
-	 * 
+	 *
 	 * @see #fromCollection(Iterator, TypeInformation)
 	 */
 	public <X> DataSource<X> fromCollection(Iterator<X> data, Class<X> type) {
 		return fromCollection(data, TypeExtractor.getForClass(type));
 	}
-	
+
 	/**
 	 * Creates a DataSet from the given iterator. Because the iterator will remain unmodified until
 	 * the actual execution happens, the type of data returned by the iterator must be given
 	 * explicitly in the form of the type information. This method is useful for cases where the type
 	 * is generic. In that case, the type class (as given in {@link #fromCollection(Iterator, Class)}
 	 * does not supply all type information.
-	 * <p>
-	 * Note that this operation will result in a non-parallel data source, i.e. a data source with
+	 *
+	 * <p>Note that this operation will result in a non-parallel data source, i.e. a data source with
 	 * a parallelism of one.
-	 * 
+	 *
 	 * @param data The collection of elements to create the data set from.
 	 * @param type The TypeInformation for the produced data set.
 	 * @return A DataSet representing the elements in the iterator.
-	 * 
+	 *
 	 * @see #fromCollection(Iterator, Class)
 	 */
 	public <X> DataSource<X> fromCollection(Iterator<X> data, TypeInformation<X> type) {
 		return new DataSource<>(this, new IteratorInputFormat<>(data), type, Utils.getCallLocationName());
 	}
-	
-	
+
 	/**
 	 * Creates a new data set that contains the given elements. The elements must all be of the same type,
 	 * for example, all of the {@link String} or {@link Integer}. The sequence of elements must not be empty.
-	 * <p>
-	 * The framework will try and determine the exact type from the collection elements.
+	 *
+	 * <p>The framework will try and determine the exact type from the collection elements.
 	 * In case of generic elements, it may be necessary to manually supply the type information
 	 * via {@link #fromCollection(Collection, TypeInformation)}.
-	 * <p>
-	 * Note that this operation will result in a non-parallel data source, i.e. a data source with
+	 *
+	 * <p>Note that this operation will result in a non-parallel data source, i.e. a data source with
 	 * a parallelism of one.
-	 * 
+	 *
 	 * @param data The elements to make up the data set.
 	 * @return A DataSet representing the given list of elements.
 	 */
@@ -804,7 +800,7 @@ public abstract class ExecutionEnvironment {
 		if (data.length == 0) {
 			throw new IllegalArgumentException("The number of elements must not be zero.");
 		}
-		
+
 		TypeInformation<X> typeInfo;
 		try {
 			typeInfo = TypeExtractor.getForObject(data[0]);
@@ -817,10 +813,10 @@ public abstract class ExecutionEnvironment {
 
 		return fromCollection(Arrays.asList(data), typeInfo, Utils.getCallLocationName());
 	}
-	
+
 	/**
-	 * Creates a new data set that contains the given elements. The framework will determine the type according to the 
-	 * based type user supplied. The elements should be the same or be the subclass to the based type. 
+	 * Creates a new data set that contains the given elements. The framework will determine the type according to the
+	 * based type user supplied. The elements should be the same or be the subclass to the based type.
 	 * The sequence of elements must not be empty.
 	 * Note that this operation will result in a non-parallel data source, i.e. a data source with
 	 * a parallelism of one.
@@ -837,7 +833,7 @@ public abstract class ExecutionEnvironment {
 		if (data.length == 0) {
 			throw new IllegalArgumentException("The number of elements must not be zero.");
 		}
-		
+
 		TypeInformation<X> typeInfo;
 		try {
 			typeInfo = TypeExtractor.getForClass(type);
@@ -850,136 +846,135 @@ public abstract class ExecutionEnvironment {
 
 		return fromCollection(Arrays.asList(data), typeInfo, Utils.getCallLocationName());
 	}
-	
-	
+
 	/**
 	 * Creates a new data set that contains elements in the iterator. The iterator is splittable, allowing the
 	 * framework to create a parallel data source that returns the elements in the iterator.
-	 * <p>
-	 * Because the iterator will remain unmodified until the actual execution happens, the type of data
+	 *
+	 * <p>Because the iterator will remain unmodified until the actual execution happens, the type of data
 	 * returned by the iterator must be given explicitly in the form of the type class (this is due to the
 	 * fact that the Java compiler erases the generic type information).
-	 * 
+	 *
 	 * @param iterator The iterator that produces the elements of the data set.
 	 * @param type The class of the data produced by the iterator. Must not be a generic class.
 	 * @return A DataSet representing the elements in the iterator.
-	 * 
+	 *
 	 * @see #fromParallelCollection(SplittableIterator, TypeInformation)
 	 */
 	public <X> DataSource<X> fromParallelCollection(SplittableIterator<X> iterator, Class<X> type) {
 		return fromParallelCollection(iterator, TypeExtractor.getForClass(type));
 	}
-	
+
 	/**
 	 * Creates a new data set that contains elements in the iterator. The iterator is splittable, allowing the
 	 * framework to create a parallel data source that returns the elements in the iterator.
-	 * <p>
-	 * Because the iterator will remain unmodified until the actual execution happens, the type of data
+	 *
+	 * <p>Because the iterator will remain unmodified until the actual execution happens, the type of data
 	 * returned by the iterator must be given explicitly in the form of the type information.
 	 * This method is useful for cases where the type is generic. In that case, the type class
 	 * (as given in {@link #fromParallelCollection(SplittableIterator, Class)} does not supply all type information.
-	 * 
+	 *
 	 * @param iterator The iterator that produces the elements of the data set.
 	 * @param type The TypeInformation for the produced data set.
 	 * @return A DataSet representing the elements in the iterator.
-	 * 
+	 *
 	 * @see #fromParallelCollection(SplittableIterator, Class)
 	 */
 	public <X> DataSource<X> fromParallelCollection(SplittableIterator<X> iterator, TypeInformation<X> type) {
 		return fromParallelCollection(iterator, type, Utils.getCallLocationName());
 	}
-	
+
 	// private helper for passing different call location names
 	private <X> DataSource<X> fromParallelCollection(SplittableIterator<X> iterator, TypeInformation<X> type, String callLocationName) {
 		return new DataSource<>(this, new ParallelIteratorInputFormat<>(iterator), type, callLocationName);
 	}
-	
+
 	/**
 	 * Creates a new data set that contains a sequence of numbers. The data set will be created in parallel,
 	 * so there is no guarantee about the order of the elements.
-	 * 
+	 *
 	 * @param from The number to start at (inclusive).
 	 * @param to The number to stop at (inclusive).
 	 * @return A DataSet, containing all number in the {@code [from, to]} interval.
 	 */
 	public DataSource<Long> generateSequence(long from, long to) {
 		return fromParallelCollection(new NumberSequenceIterator(from, to), BasicTypeInfo.LONG_TYPE_INFO, Utils.getCallLocationName());
-	}	
-	
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Executing
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Triggers the program execution. The environment will execute all parts of the program that have
 	 * resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()},
 	 * writing results (e.g. {@link DataSet#writeAsText(String)},
 	 * {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic
 	 * data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
-	 * <p>
-	 * The program execution will be logged and displayed with a generated default name.
-	 * 
+	 *
+	 * <p>The program execution will be logged and displayed with a generated default name.
+	 *
 	 * @return The result of the job execution, containing elapsed time and accumulators.
 	 * @throws Exception Thrown, if the program executions fails.
 	 */
 	public JobExecutionResult execute() throws Exception {
 		return execute(getDefaultName());
 	}
-	
+
 	/**
 	 * Triggers the program execution. The environment will execute all parts of the program that have
 	 * resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()},
 	 * writing results (e.g. {@link DataSet#writeAsText(String)},
 	 * {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic
 	 * data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
-	 * <p>
-	 * The program execution will be logged and displayed with the given job name.
-	 * 
+	 *
+	 * <p>The program execution will be logged and displayed with the given job name.
+	 *
 	 * @return The result of the job execution, containing elapsed time and accumulators.
 	 * @throws Exception Thrown, if the program executions fails.
 	 */
 	public abstract JobExecutionResult execute(String jobName) throws Exception;
 
 	/**
-	 * Creates the plan with which the system will execute the program, and returns it as 
+	 * Creates the plan with which the system will execute the program, and returns it as
 	 * a String using a JSON representation of the execution data flow graph.
 	 * Note that this needs to be called, before the plan is executed.
-	 * 
+	 *
 	 * @return The execution plan of the program, as a JSON String.
 	 * @throws Exception Thrown, if the compiler could not be instantiated, or the master could not
 	 *                   be contacted to retrieve information relevant to the execution planning.
 	 */
 	public abstract String getExecutionPlan() throws Exception;
-	
+
 	/**
 	 * Registers a file at the distributed cache under the given name. The file will be accessible
 	 * from any user-defined function in the (distributed) runtime under a local path. Files
 	 * may be local files (as long as all relevant workers have access to it), or files in a distributed file system.
 	 * The runtime will copy the files temporarily to a local cache, if needed.
-	 * <p>
-	 * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via
+	 *
+	 * <p>The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via
 	 * {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access
-	 * {@link org.apache.flink.api.common.cache.DistributedCache} via 
+	 * {@link org.apache.flink.api.common.cache.DistributedCache} via
 	 * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
-	 * 
+	 *
 	 * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")
 	 * @param name The name under which the file is registered.
 	 */
 	public void registerCachedFile(String filePath, String name){
 		registerCachedFile(filePath, name, false);
 	}
-	
+
 	/**
 	 * Registers a file at the distributed cache under the given name. The file will be accessible
 	 * from any user-defined function in the (distributed) runtime under a local path. Files
-	 * may be local files (as long as all relevant workers have access to it), or files in a distributed file system. 
+	 * may be local files (as long as all relevant workers have access to it), or files in a distributed file system.
 	 * The runtime will copy the files temporarily to a local cache, if needed.
-	 * <p>
-	 * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via
+	 *
+	 * <p>The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside UDFs via
 	 * {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and provides access
-	 * {@link org.apache.flink.api.common.cache.DistributedCache} via 
+	 * {@link org.apache.flink.api.common.cache.DistributedCache} via
 	 * {@link org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
-	 * 
+	 *
 	 * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or "hdfs://host:port/and/path")
 	 * @param name The name under which the file is registered.
 	 * @param executable flag indicating whether the file should be executable
@@ -987,11 +982,11 @@ public abstract class ExecutionEnvironment {
 	public void registerCachedFile(String filePath, String name, boolean executable){
 		this.cacheFile.add(new Tuple2<>(name, new DistributedCacheEntry(filePath, executable)));
 	}
-	
+
 	/**
 	 * Registers all files that were registered at this execution environment's cache registry of the
 	 * given plan's cache registry.
-	 * 
+	 *
 	 * @param p The plan to register files at.
 	 * @throws IOException Thrown if checks for existence and sanity fail.
 	 */
@@ -1000,7 +995,7 @@ public abstract class ExecutionEnvironment {
 			p.registerCachedFile(entry.f0, entry.f1);
 		}
 	}
-	
+
 	/**
 	 * Creates the program's {@link Plan}. The plan is a description of all data sources, data sinks,
 	 * and operations and how they interact, as an isolated unit that can be executed with a
@@ -1008,14 +1003,14 @@ public abstract class ExecutionEnvironment {
 	 * executor is an alternative way to run a program and is only possible if the program consists
 	 * only of distributed operations.
 	 * This automatically starts a new stage of execution.
-	 * 
+	 *
 	 * @return The program's plan.
 	 */
 	@Internal
 	public Plan createProgramPlan() {
 		return createProgramPlan(null);
 	}
-	
+
 	/**
 	 * Creates the program's {@link Plan}. The plan is a description of all data sources, data sinks,
 	 * and operations and how they interact, as an isolated unit that can be executed with a
@@ -1023,7 +1018,7 @@ public abstract class ExecutionEnvironment {
 	 * executor is an alternative way to run a program and is only possible if the program consists
 	 * only of distributed operations.
 	 * This automatically starts a new stage of execution.
-	 * 
+	 *
 	 * @param jobName The name attached to the plan (displayed in logs and monitoring).
 	 * @return The program's plan.
 	 */
@@ -1056,11 +1051,11 @@ public abstract class ExecutionEnvironment {
 						"Examples are writing the data set or printing it.");
 			}
 		}
-		
+
 		if (jobName == null) {
 			jobName = getDefaultName();
 		}
-		
+
 		OperatorTranslation translator = new OperatorTranslation();
 		Plan plan = translator.translateToPlan(this.sinks, jobName);
 
@@ -1068,19 +1063,20 @@ public abstract class ExecutionEnvironment {
 			plan.setDefaultParallelism(getParallelism());
 		}
 		plan.setExecutionConfig(getConfig());
-		
+
 		// Check plan for GenericTypeInfo's and register the types at the serializers.
 		if (!config.isAutoTypeRegistrationDisabled()) {
 			plan.accept(new Visitor<org.apache.flink.api.common.operators.Operator<?>>() {
-				
+
 				private final HashSet<Class<?>> deduplicator = new HashSet<>();
-				
+
 				@Override
 				public boolean preVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {
 					OperatorInformation<?> opInfo = visitable.getOperatorInfo();
 					Serializers.recursivelyRegisterType(opInfo.getOutputType(), config, deduplicator);
 					return true;
 				}
+
 				@Override
 				public void postVisit(org.apache.flink.api.common.operators.Operator<?> visitable) {}
 			});
@@ -1091,7 +1087,7 @@ public abstract class ExecutionEnvironment {
 		} catch (Exception e) {
 			throw new RuntimeException("Error while registering cached files: " + e.getMessage(), e);
 		}
-		
+
 		// clear all the sinks such that the next execution does not redo everything
 		if (clearSinks) {
 			this.sinks.clear();
@@ -1107,17 +1103,17 @@ public abstract class ExecutionEnvironment {
 				config.getDefaultKryoSerializerClasses().size();
 		LOG.info("The job has {} registered types and {} default Kryo serializers", registeredTypes, defaultKryoSerializers);
 
-		if(config.isForceKryoEnabled() && config.isForceAvroEnabled()) {
+		if (config.isForceKryoEnabled() && config.isForceAvroEnabled()) {
 			LOG.warn("In the ExecutionConfig, both Avro and Kryo are enforced. Using Kryo serializer");
 		}
-		if(config.isForceKryoEnabled()) {
+		if (config.isForceKryoEnabled()) {
 			LOG.info("Using KryoSerializer for serializing POJOs");
 		}
-		if(config.isForceAvroEnabled()) {
+		if (config.isForceAvroEnabled()) {
 			LOG.info("Using AvroSerializer for serializing POJOs");
 		}
 
-		if(LOG.isDebugEnabled()) {
+		if (LOG.isDebugEnabled()) {
 			LOG.debug("Registered Kryo types: {}", config.getRegisteredKryoTypes().toString());
 			LOG.debug("Registered Kryo with Serializers types: {}", config.getRegisteredTypesWithKryoSerializers().entrySet().toString());
 			LOG.debug("Registered Kryo with Serializer Classes types: {}", config.getRegisteredTypesWithKryoSerializerClasses().entrySet().toString());
@@ -1131,27 +1127,27 @@ public abstract class ExecutionEnvironment {
 
 		return plan;
 	}
-	
+
 	/**
 	 * Adds the given sink to this environment. Only sinks that have been added will be executed once
 	 * the {@link #execute()} or {@link #execute(String)} method is called.
-	 * 
+	 *
 	 * @param sink The sink to add for execution.
 	 */
 	@Internal
 	void registerDataSink(DataSink<?> sink) {
 		this.sinks.add(sink);
 	}
-	
+
 	/**
 	 * Gets a default job name, based on the timestamp when this method is invoked.
-	 * 
+	 *
 	 * @return A default job name.
 	 */
 	private static String getDefaultName() {
 		return "Flink Java Job at " + Calendar.getInstance().getTime();
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Instantiation of Execution Contexts
 	// --------------------------------------------------------------------------------------------
@@ -1161,11 +1157,11 @@ public abstract class ExecutionEnvironment {
 	 * If the program is invoked standalone, this method returns a local execution environment, as returned by
 	 * {@link #createLocalEnvironment()}. If the program is invoked from within the command line client to be
 	 * submitted to a cluster, this method returns the execution environment of this cluster.
-	 * 
+	 *
 	 * @return The execution environment of the context in which the program is executed.
 	 */
 	public static ExecutionEnvironment getExecutionEnvironment() {
-		return contextEnvironmentFactory == null ? 
+		return contextEnvironmentFactory == null ?
 				createLocalEnvironment() : contextEnvironmentFactory.createExecutionEnvironment();
 	}
 
@@ -1187,18 +1183,18 @@ public abstract class ExecutionEnvironment {
 	 * multi-threaded fashion in the same JVM as the environment was created in. The default
 	 * parallelism of the local environment is the number of hardware contexts (CPU cores / threads),
 	 * unless it was specified differently by {@link #setDefaultLocalParallelism(int)}.
-	 * 
+	 *
 	 * @return A local execution environment.
 	 */
 	public static LocalEnvironment createLocalEnvironment() {
 		return createLocalEnvironment(defaultLocalDop);
 	}
-	
+
 	/**
 	 * Creates a {@link LocalEnvironment}. The local execution environment will run the program in a
 	 * multi-threaded fashion in the same JVM as the environment was created in. It will use the
 	 * parallelism specified in the parameter.
-	 * 
+	 *
 	 * @param parallelism The parallelism for the local environment.
 	 * @return A local execution environment with the specified parallelism.
 	 */
@@ -1244,13 +1240,13 @@ public abstract class ExecutionEnvironment {
 	}
 
 	/**
-	 * Creates a {@link RemoteEnvironment}. The remote environment sends (parts of) the program 
+	 * Creates a {@link RemoteEnvironment}. The remote environment sends (parts of) the program
 	 * to a cluster for execution. Note that all file paths used in the program must be accessible from the
 	 * cluster. The execution will use the cluster's default parallelism, unless the parallelism is
 	 * set explicitly via {@link ExecutionEnvironment#setParallelism(int)}.
-	 * 
+	 *
 	 * @param host The host name or address of the master (JobManager), where the program should be executed.
-	 * @param port The port of the master (JobManager), where the program should be executed. 
+	 * @param port The port of the master (JobManager), where the program should be executed.
 	 * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses
 	 *                 user-defined functions, user-defined input formats, or any libraries, those must be
 	 *                 provided in the JAR files.
@@ -1266,7 +1262,7 @@ public abstract class ExecutionEnvironment {
 	 * cluster. The custom configuration file is used to configure Akka specific configuration parameters
 	 * for the Client only; Program parallelism can be set via {@link ExecutionEnvironment#setParallelism(int)}.
 	 *
-	 * Cluster configuration has to be done in the remotely running Flink instance.
+	 * <p>Cluster configuration has to be done in the remotely running Flink instance.
 	 *
 	 * @param host The host name or address of the master (JobManager), where the program should be executed.
 	 * @param port The port of the master (JobManager), where the program should be executed.
@@ -1282,12 +1278,12 @@ public abstract class ExecutionEnvironment {
 	}
 
 	/**
-	 * Creates a {@link RemoteEnvironment}. The remote environment sends (parts of) the program 
+	 * Creates a {@link RemoteEnvironment}. The remote environment sends (parts of) the program
 	 * to a cluster for execution. Note that all file paths used in the program must be accessible from the
 	 * cluster. The execution will use the specified parallelism.
-	 * 
+	 *
 	 * @param host The host name or address of the master (JobManager), where the program should be executed.
-	 * @param port The port of the master (JobManager), where the program should be executed. 
+	 * @param port The port of the master (JobManager), where the program should be executed.
 	 * @param parallelism The parallelism to use during the execution.
 	 * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses
 	 *                 user-defined functions, user-defined input formats, or any libraries, those must be
@@ -1307,7 +1303,7 @@ public abstract class ExecutionEnvironment {
 	/**
 	 * Gets the default parallelism that will be used for the local execution environment created by
 	 * {@link #createLocalEnvironment()}.
-	 * 
+	 *
 	 * @return The default local parallelism
 	 */
 	public static int getDefaultLocalParallelism() {
@@ -1317,7 +1313,7 @@ public abstract class ExecutionEnvironment {
 	/**
 	 * Sets the default parallelism that will be used for the local execution environment created by
 	 * {@link #createLocalEnvironment()}.
-	 * 
+	 *
 	 * @param parallelism The parallelism to use as the default local parallelism.
 	 */
 	public static void setDefaultLocalParallelism(int parallelism) {
@@ -1333,9 +1329,9 @@ public abstract class ExecutionEnvironment {
 	 * Sets a context environment factory, that creates the context environment for running programs
 	 * with pre-configured environments. Examples are running programs from the command line, and
 	 * running programs in the Scala shell.
-	 * 
+	 *
 	 * <p>When the context environment factory is set, no other environments can be explicitly used.
-	 * 
+	 *
 	 * @param ctx The context environment factory.
 	 */
 	protected static void initializeContextEnvironment(ExecutionEnvironmentFactory ctx) {
@@ -1354,7 +1350,7 @@ public abstract class ExecutionEnvironment {
 	/**
 	 * Checks whether it is currently permitted to explicitly instantiate a LocalEnvironment
 	 * or a RemoteEnvironment.
-	 * 
+	 *
 	 * @return True, if it is possible to explicitly instantiate a LocalEnvironment or a
 	 *         RemoteEnvironment, false otherwise.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/8624c290/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironmentFactory.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironmentFactory.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironmentFactory.java
index b75835f..829049e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironmentFactory.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironmentFactory.java
@@ -25,10 +25,10 @@ import org.apache.flink.annotation.Public;
  */
 @Public
 public interface ExecutionEnvironmentFactory {
-	
+
 	/**
 	 * Creates an ExecutionEnvironment from this factory.
-	 * 
+	 *
 	 * @return An ExecutionEnvironment.
 	 */
 	ExecutionEnvironment createExecutionEnvironment();

http://git-wip-us.apache.org/repos/asf/flink/blob/8624c290/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
index 0b2567a..dcb71cb 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.api.java;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
@@ -31,28 +31,28 @@ import org.apache.flink.configuration.Configuration;
 /**
  * An {@link ExecutionEnvironment} that runs the program locally, multi-threaded, in the JVM where the
  * environment is instantiated.
- * 
+ *
  * <p>When this environment is instantiated, it uses a default parallelism of {@code 1}. The default
  * parallelism can be set via {@link #setParallelism(int)}.
- * 
+ *
  * <p>Local environments can also be instantiated through {@link ExecutionEnvironment#createLocalEnvironment()}
  * and {@link ExecutionEnvironment#createLocalEnvironment(int)}. The former version will pick a
  * default parallelism equal to the number of hardware contexts in the local machine.
  */
 @Public
 public class LocalEnvironment extends ExecutionEnvironment {
-	
-	/** The user-defined configuration for the local execution */
+
+	/** The user-defined configuration for the local execution. */
 	private final Configuration configuration;
 
-	/** Create lazily upon first use */
+	/** Create lazily upon first use. */
 	private PlanExecutor executor;
 
 	/** In case we keep the executor alive for sessions, this reaper shuts it down eventually.
 	 * The reaper's finalize method triggers the executor shutdown. */
 	@SuppressWarnings("all")
 	private ExecutorReaper executorReaper;
-	
+
 	/**
 	 * Creates a new local environment.
 	 */
@@ -62,7 +62,7 @@ public class LocalEnvironment extends ExecutionEnvironment {
 
 	/**
 	 * Creates a new local environment that configures its local executor with the given configuration.
-	 * 
+	 *
 	 * @param config The configuration used to configure the local executor.
 	 */
 	public LocalEnvironment(Configuration config) {
@@ -73,9 +73,9 @@ public class LocalEnvironment extends ExecutionEnvironment {
 		}
 		this.configuration = config == null ? new Configuration() : config;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
 		if (executor == null) {
@@ -93,11 +93,11 @@ public class LocalEnvironment extends ExecutionEnvironment {
 		this.lastJobExecutionResult = result;
 		return result;
 	}
-	
+
 	@Override
 	public String getExecutionPlan() throws Exception {
 		Plan p = createProgramPlan(null, false);
-		
+
 		// make sure that we do not start an executor in any case here.
 		// if one runs, fine, of not, we only create the class but disregard immediately afterwards
 		if (executor != null) {
@@ -118,15 +118,15 @@ public class LocalEnvironment extends ExecutionEnvironment {
 			// create also a new JobID
 			jobID = JobID.generate();
 		}
-		
+
 		// create a new local executor
 		executor = PlanExecutor.createLocalExecutor(configuration);
 		executor.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
-		
+
 		// if we have a session, start the mini cluster eagerly to have it available across sessions
 		if (getSessionTimeout() > 0) {
 			executor.start();
-			
+
 			// also install the reaper that will shut it down eventually
 			executorReaper = new ExecutorReaper(executor);
 		}
@@ -135,7 +135,7 @@ public class LocalEnvironment extends ExecutionEnvironment {
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
-	
+
 	@Override
 	public String toString() {
 		return "Local Environment (parallelism = " + (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? "default" : getParallelism())
@@ -150,7 +150,7 @@ public class LocalEnvironment extends ExecutionEnvironment {
 	 * This thread shuts down the local executor.
 	 *
 	 * <p><b>IMPORTANT:</b> This must be a static inner class to hold no reference to the outer class.
-	 * Otherwise, the outer class could never become garbage collectible while this thread runs.</p>
+	 * Otherwise, the outer class could never become garbage collectible while this thread runs.
 	 */
 	private static class ShutdownThread extends Thread {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8624c290/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
index 34a54ba..fa223bd 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.api.java;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
@@ -39,7 +39,7 @@ import java.util.List;
  * An {@link ExecutionEnvironment} that sends programs to a cluster for execution. The environment
  * needs to be created with the address and port of the JobManager of the Flink cluster that
  * should execute the programs.
- * 
+ *
  * <p>Many programs executed via the remote environment depend on additional classes. Such classes
  * may be the classes of functions (transformation, aggregation, ...) or libraries. Those classes
  * must be attached to the remote environment as JAR files, to allow the environment to ship the
@@ -47,26 +47,26 @@ import java.util.List;
  */
 @Public
 public class RemoteEnvironment extends ExecutionEnvironment {
-	
-	/** The hostname of the JobManager */
+
+	/** The hostname of the JobManager. */
 	protected final String host;
 
-	/** The port of the JobManager main actor system */
+	/** The port of the JobManager main actor system. */
 	protected final int port;
 
-	/** The jar files that need to be attached to each job */
+	/** The jar files that need to be attached to each job. */
 	protected final List<URL> jarFiles;
 
-	/** The configuration used by the client that connects to the cluster */
+	/** The configuration used by the client that connects to the cluster. */
 	protected Configuration clientConfiguration;
-	
-	/** The remote executor lazily created upon first use */
+
+	/** The remote executor lazily created upon first use. */
 	protected PlanExecutor executor;
-	
-	/** Optional shutdown hook, used in session mode to eagerly terminate the last session */
+
+	/** Optional shutdown hook, used in session mode to eagerly terminate the last session. */
 	private Thread shutdownHook;
 
-	/** The classpaths that need to be attached to each job */
+	/** The classpaths that need to be attached to each job. */
 	protected final List<URL> globalClasspaths;
 
 	/**
@@ -105,7 +105,7 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 	/**
 	 * Creates a new RemoteEnvironment that points to the master (JobManager) described by the
 	 * given host name and port.
-	 * 
+	 *
 	 * <p>Each program execution will have all the given JAR files in its classpath.
 	 *
 	 * @param host The host name or address of the master (JobManager), where the program should be executed.
@@ -114,8 +114,8 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 	 * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses
 	 *                 user-defined functions, user-defined input formats, or any libraries, those must be
 	 *                 provided in the JAR files.
-	 * @param globalClasspaths The paths of directories and JAR files that are added to each user code 
-	 *                 classloader on all nodes in the cluster. Note that the paths must specify a 
+	 * @param globalClasspaths The paths of directories and JAR files that are added to each user code
+	 *                 classloader on all nodes in the cluster. Note that the paths must specify a
 	 *                 protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a NFS share).
 	 *                 The protocol must be supported by the {@link java.net.URLClassLoader}.
 	 */
@@ -202,14 +202,14 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 		jobID = JobID.generate();
 		installShutdownHook();
 	}
-	
+
 	protected PlanExecutor getExecutor() throws Exception {
 		if (executor == null) {
 			executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration,
 				jarFiles, globalClasspaths);
 			executor.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
 		}
-		
+
 		// if we are using sessions, we keep the executor running
 		if (getSessionTimeout() > 0 && !executor.isRunning()) {
 			executor.start();
@@ -237,7 +237,7 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 				LOG.warn("Exception while unregistering the cleanup shutdown hook.");
 			}
 		}
-		
+
 		try {
 			PlanExecutor executor = this.executor;
 			if (executor != null) {
@@ -249,13 +249,13 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 			throw new RuntimeException("Failed to dispose the session shutdown hook.");
 		}
 	}
-	
+
 	@Override
 	public String toString() {
 		return "Remote Environment (" + this.host + ":" + this.port + " - parallelism = " +
 				(getParallelism() == -1 ? "default" : getParallelism()) + ") : " + getIdString();
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Shutdown hooks and reapers
 	// ------------------------------------------------------------------------
@@ -273,7 +273,7 @@ public class RemoteEnvironment extends ExecutionEnvironment {
 					}
 				}
 			});
-	
+
 			try {
 				// Add JVM shutdown hook to call shutdown of service
 				Runtime.getRuntime().addShutdownHook(shutdownHook);

http://git-wip-us.apache.org/repos/asf/flink/blob/8624c290/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
index 36ccb23..44e176c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.api.java;
 
-import org.apache.commons.lang3.StringUtils;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
@@ -31,6 +29,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.configuration.Configuration;
 
+import org.apache.commons.lang3.StringUtils;
+
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
@@ -43,7 +43,7 @@ import static org.apache.flink.api.java.functions.FunctionAnnotation.SkipCodeAna
  */
 @Internal
 public final class Utils {
-	
+
 	public static final Random RNG = new Random();
 
 	public static String getCallLocationName() {
@@ -63,12 +63,12 @@ public final class Utils {
 	}
 
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Utility sink function that counts elements and writes the count into an accumulator,
 	 * from which it can be retrieved by the client. This sink is used by the
 	 * {@link DataSet#count()} function.
-	 * 
+	 *
 	 * @param <T> Type of elements to count.
 	 */
 	@SkipCodeAnalysis
@@ -115,7 +115,7 @@ public final class Utils {
 
 		private final String id;
 		private final TypeSerializer<T> serializer;
-		
+
 		private SerializedListAccumulator<T> accumulator;
 
 		public CollectHelper(String id, TypeSerializer<T> serializer) {
@@ -143,6 +143,9 @@ public final class Utils {
 		}
 	}
 
+	/**
+	 * Accumulator of {@link ChecksumHashCode}.
+	 */
 	public static class ChecksumHashCode implements SimpleAccumulator<ChecksumHashCode> {
 
 		private static final long serialVersionUID = 1L;
@@ -213,6 +216,10 @@ public final class Utils {
 		}
 	}
 
+	/**
+	 * {@link RichOutputFormat} for {@link ChecksumHashCode}.
+	 * @param <T>
+	 */
 	@SkipCodeAnalysis
 	public static class ChecksumHashCodeHelper<T> extends RichOutputFormat<T> {
 
@@ -248,7 +255,6 @@ public final class Utils {
 		}
 	}
 
-
 	// --------------------------------------------------------------------------------------------
 
 	/**
@@ -262,19 +268,19 @@ public final class Utils {
 	private static <T> String getSerializerTree(TypeInformation<T> ti, int indent) {
 		String ret = "";
 		if (ti instanceof CompositeType) {
-			ret += StringUtils.repeat(' ', indent) + ti.getClass().getSimpleName()+"\n";
+			ret += StringUtils.repeat(' ', indent) + ti.getClass().getSimpleName() + "\n";
 			CompositeType<T> cti = (CompositeType<T>) ti;
 			String[] fieldNames = cti.getFieldNames();
 			for (int i = 0; i < cti.getArity(); i++) {
 				TypeInformation<?> fieldType = cti.getTypeAt(i);
-				ret += StringUtils.repeat(' ', indent + 2) + fieldNames[i]+":"+getSerializerTree(fieldType, indent);
+				ret += StringUtils.repeat(' ', indent + 2) + fieldNames[i] + ":" + getSerializerTree(fieldType, indent);
 			}
 		} else {
 			if (ti instanceof GenericTypeInfo) {
-				ret += StringUtils.repeat(' ', indent) + "GenericTypeInfo ("+ti.getTypeClass().getSimpleName()+")\n";
+				ret += StringUtils.repeat(' ', indent) + "GenericTypeInfo (" + ti.getTypeClass().getSimpleName() + ")\n";
 				ret += getGenericTypeTree(ti.getTypeClass(), indent + 4);
 			} else {
-				ret += StringUtils.repeat(' ', indent) + ti.toString()+"\n";
+				ret += StringUtils.repeat(' ', indent) + ti.toString() + "\n";
 			}
 		}
 		return ret;
@@ -286,7 +292,7 @@ public final class Utils {
 			if (Modifier.isStatic(field.getModifiers()) || Modifier.isTransient(field.getModifiers())) {
 				continue;
 			}
-			ret += StringUtils.repeat(' ', indent) + field.getName() + ":" + field.getType().getName() + 
+			ret += StringUtils.repeat(' ', indent) + field.getName() + ":" + field.getType().getName() +
 				(field.getType().isEnum() ? " (is enum)" : "") + "\n";
 			if (!field.getType().isPrimitive()) {
 				ret += getGenericTypeTree(field.getType(), indent + 4);

http://git-wip-us.apache.org/repos/asf/flink/blob/8624c290/flink-java/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java b/flink-java/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
index 039f4de..6088453 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
@@ -26,6 +26,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -45,7 +46,7 @@ import java.util.List;
  */
 public abstract class SequentialFormatTestBase<T> extends TestLogger {
 
-	public class InputSplitSorter implements Comparator<FileInputSplit> {
+	private class InputSplitSorter implements Comparator<FileInputSplit> {
 		@Override
 		public int compare(FileInputSplit o1, FileInputSplit o2) {
 			int pathOrder = o1.getPath().getName().compareTo(o2.getPath().getName());
@@ -74,7 +75,7 @@ public abstract class SequentialFormatTestBase<T> extends TestLogger {
 	}
 
 	/**
-	 * Count how many bytes would be written if all records were directly serialized
+	 * Count how many bytes would be written if all records were directly serialized.
 	 */
 	@Before
 	public void calcRawDataSize() throws IOException {
@@ -83,7 +84,7 @@ public abstract class SequentialFormatTestBase<T> extends TestLogger {
 			ByteCounter byteCounter = new ByteCounter();
 
 			for (int fileCount = 0; fileCount < this.getNumberOfTuplesPerFile(fileIndex); fileCount++, recordIndex++) {
-				writeRecord(this.getRecord(recordIndex), 
+				writeRecord(this.getRecord(recordIndex),
 					new DataOutputViewStreamWrapper(byteCounter));
 			}
 			this.rawDataSizes[fileIndex] = byteCounter.getLength();
@@ -91,7 +92,7 @@ public abstract class SequentialFormatTestBase<T> extends TestLogger {
 	}
 
 	/**
-	 * Checks if the expected input splits were created
+	 * Checks if the expected input splits were created.
 	 */
 	@Test
 	public void checkInputSplits() throws IOException {
@@ -124,7 +125,7 @@ public abstract class SequentialFormatTestBase<T> extends TestLogger {
 	}
 
 	/**
-	 * Tests if the expected sequence and amount of data can be read
+	 * Tests if the expected sequence and amount of data can be read.
 	 */
 	@Test
 	public void checkRead() throws Exception {
@@ -204,7 +205,7 @@ public abstract class SequentialFormatTestBase<T> extends TestLogger {
 			int recordIndex = 0;
 			for (int fileIndex = 0; fileIndex < this.parallelism; fileIndex++) {
 				BinaryOutputFormat<T> output = createOutputFormat(this.tempFile.toURI() + "/" +
-						(fileIndex+1), configuration);
+						(fileIndex + 1), configuration);
 				for (int fileCount = 0; fileCount < this.getNumberOfTuplesPerFile(fileIndex); fileCount++, recordIndex++) {
 					output.writeRecord(this.getRecord(recordIndex));
 				}
@@ -233,27 +234,26 @@ public abstract class SequentialFormatTestBase<T> extends TestLogger {
 		}
 	}
 
-	abstract protected BinaryInputFormat<T> createInputFormat();
+	protected abstract BinaryInputFormat<T> createInputFormat();
 
-	abstract protected BinaryOutputFormat<T> createOutputFormat(final String path, final
-																Configuration configuration)
+	protected abstract BinaryOutputFormat<T> createOutputFormat(String path, Configuration configuration)
 			throws IOException;
 
-	abstract protected int getInfoSize();
+	protected abstract int getInfoSize();
 
 	/**
-	 * Returns the record to write at the given position
+	 * Returns the record to write at the given position.
 	 */
-	abstract protected T getRecord(int index);
+	protected abstract T getRecord(int index);
 
-	abstract protected T createInstance();
+	protected abstract T createInstance();
 
-	abstract protected void writeRecord(T record, DataOutputView outputView) throws IOException;
+	protected abstract void writeRecord(T record, DataOutputView outputView) throws IOException;
 
 	/**
-	 * Checks if both records are equal
+	 * Checks if both records are equal.
 	 */
-	abstract protected void checkEquals(T expected, T actual);
+	protected abstract void checkEquals(T expected, T actual);
 
 	private int getExpectedBlockCount(int fileIndex) {
 		int expectedBlockCount =
@@ -278,14 +278,14 @@ public abstract class SequentialFormatTestBase<T> extends TestLogger {
 
 	/**
 	 * Counts the bytes that would be written.
-	 * 
+	 *
 	 */
 	private static final class ByteCounter extends OutputStream {
 		int length = 0;
 
 		/**
 		 * Returns the length.
-		 * 
+		 *
 		 * @return the length
 		 */
 		public int getLength() {

http://git-wip-us.apache.org/repos/asf/flink/blob/8624c290/flink-java/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java
index ac1e19c..cfcfdfd 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/io/SerializedFormatTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.types.StringValue;
+
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.runner.RunWith;
@@ -32,6 +33,9 @@ import org.junit.runners.Parameterized;
 
 import java.io.IOException;
 
+/**
+ * Tests for serialized formats.
+ */
 @RunWith(Parameterized.class)
 public class SerializedFormatTest extends SequentialFormatTestBase<Record> {
 
@@ -58,11 +62,9 @@ public class SerializedFormatTest extends SequentialFormatTestBase<Record> {
 		return inputFormat;
 	}
 
-	
 	@Override
 	protected BinaryOutputFormat<Record> createOutputFormat(String path, Configuration configuration)
-			throws IOException
-	{
+			throws IOException {
 		final SerializedOutputFormat<Record> outputFormat = new SerializedOutputFormat<Record>();
 		outputFormat.setOutputFilePath(new Path(path));
 		outputFormat.setWriteMode(FileSystem.WriteMode.OVERWRITE);

http://git-wip-us.apache.org/repos/asf/flink/blob/8624c290/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java
index dce9713..821d956 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionAccumulatorsTest.java
@@ -18,53 +18,59 @@
 
 package org.apache.flink.api.common.operators;
 
-import static org.junit.Assert.*;
-
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.accumulators.IntCounter;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.configuration.Configuration;
+
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link CollectionExecutor} with accumulators.
+ */
 public class CollectionExecutionAccumulatorsTest {
 
 	private static final String ACCUMULATOR_NAME = "TEST ACC";
-	
+
 	@Test
 	public void testAccumulator() {
 		try {
-			final int NUM_ELEMENTS = 100;
-			
+			final int numElements = 100;
+
 			ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
-			
-			env.generateSequence(1, NUM_ELEMENTS)
+
+			env.generateSequence(1, numElements)
 				.map(new CountingMapper())
 				.output(new DiscardingOutputFormat<Long>());
-			
+
 			JobExecutionResult result = env.execute();
-			
+
 			assertTrue(result.getNetRuntime() >= 0);
 
-			assertEquals(NUM_ELEMENTS, (int) result.getAccumulatorResult(ACCUMULATOR_NAME));
+			assertEquals(numElements, (int) result.getAccumulatorResult(ACCUMULATOR_NAME));
 		}
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@SuppressWarnings("serial")
-	public static class CountingMapper extends RichMapFunction<Long, Long> {
-		
+	private static class CountingMapper extends RichMapFunction<Long, Long> {
+
 		private IntCounter accumulator;
-		
+
 		@Override
 		public void open(Configuration parameters) {
 			accumulator = getRuntimeContext().getIntCounter(ACCUMULATOR_NAME);
 		}
-		
+
 		@Override
 		public Long map(Long value) {
 			accumulator.add(1);

http://git-wip-us.apache.org/repos/asf/flink/blob/8624c290/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionIterationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionIterationTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionIterationTest.java
index 6ca0eb1..ddd0894 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionIterationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionIterationTest.java
@@ -18,12 +18,6 @@
 
 package org.apache.flink.api.common.operators;
 
-//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
@@ -36,8 +30,18 @@ import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
+
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link CollectionExecutor} with iterations.
+ */
 @SuppressWarnings("serial")
 public class CollectionExecutionIterationTest implements java.io.Serializable {
 
@@ -45,16 +49,16 @@ public class CollectionExecutionIterationTest implements java.io.Serializable {
 	public void testBulkIteration() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
-			
+
 			IterativeDataSet<Integer> iteration = env.fromElements(1).iterate(10);
-			
+
 			DataSet<Integer> result = iteration.closeWith(iteration.map(new AddSuperstepNumberMapper()));
-			
+
 			List<Integer> collected = new ArrayList<Integer>();
 			result.output(new LocalCollectionOutputFormat<Integer>(collected));
-			
+
 			env.execute();
-			
+
 			assertEquals(1, collected.size());
 			assertEquals(56, collected.get(0).intValue());
 		}
@@ -63,14 +67,14 @@ public class CollectionExecutionIterationTest implements java.io.Serializable {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testBulkIterationWithTerminationCriterion() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
-			
+
 			IterativeDataSet<Integer> iteration = env.fromElements(1).iterate(100);
-			
+
 			DataSet<Integer> iterationResult = iteration.map(new AddSuperstepNumberMapper());
 
 			DataSet<Integer> terminationCriterion = iterationResult.filter(new FilterFunction<Integer>() {
@@ -78,14 +82,14 @@ public class CollectionExecutionIterationTest implements java.io.Serializable {
 					return value < 50;
 				}
 			});
-			
+
 			List<Integer> collected = new ArrayList<Integer>();
-			
+
 			iteration.closeWith(iterationResult, terminationCriterion)
 					.output(new LocalCollectionOutputFormat<Integer>(collected));
-			
+
 			env.execute();
-			
+
 			assertEquals(1, collected.size());
 			assertEquals(56, collected.get(0).intValue());
 		}
@@ -106,7 +110,7 @@ public class CollectionExecutionIterationTest implements java.io.Serializable {
 					new Tuple2<Integer, Integer>(2, 0),
 					new Tuple2<Integer, Integer>(3, 0),
 					new Tuple2<Integer, Integer>(4, 0));
-			
+
 			@SuppressWarnings("unchecked")
 			DataSet<Tuple1<Integer>> workInput = env.fromElements(
 					new Tuple1<Integer>(1),
@@ -114,7 +118,6 @@ public class CollectionExecutionIterationTest implements java.io.Serializable {
 					new Tuple1<Integer>(3),
 					new Tuple1<Integer>(4));
 
-
 			// Perform a delta iteration where we add those values to the workset where
 			// the second tuple field is smaller than the first tuple field.
 			// At the end both tuple fields must be the same.
@@ -144,7 +147,6 @@ public class CollectionExecutionIterationTest implements java.io.Serializable {
 				}
 			});
 
-
 			List<Tuple2<Integer, Integer>> collected = new ArrayList<Tuple2<Integer, Integer>>();
 
 			iteration.closeWith(solDelta, nextWorkset)
@@ -162,9 +164,9 @@ public class CollectionExecutionIterationTest implements java.io.Serializable {
 			fail(e.getMessage());
 		}
 	}
-	
-	public static class AddSuperstepNumberMapper extends RichMapFunction<Integer, Integer> {
-		
+
+	private static class AddSuperstepNumberMapper extends RichMapFunction<Integer, Integer> {
+
 		@Override
 		public Integer map(Integer value) {
 			int superstep = getIterationRuntimeContext().getSuperstepNumber();

http://git-wip-us.apache.org/repos/asf/flink/blob/8624c290/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionWithBroadcastVariableTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionWithBroadcastVariableTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionWithBroadcastVariableTest.java
index 2cdd68f..096e309 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionWithBroadcastVariableTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/CollectionExecutionWithBroadcastVariableTest.java
@@ -18,42 +18,48 @@
 
 package org.apache.flink.api.common.operators;
 
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.flink.api.common.functions.RichCrossFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
 import org.apache.flink.configuration.Configuration;
+
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link CollectionExecutor} with broadcasted variables.
+ */
 @SuppressWarnings("serial")
 public class CollectionExecutionWithBroadcastVariableTest {
 
 	private static final String BC_VAR_NAME = "BC";
-	
-	private final String[] TEST_DATA = { "A", "B", "C", "D" };
-	private final String SUFFIX = "-suffixed";
-	
+
+	private static final String[] TEST_DATA = { "A", "B", "C", "D" };
+	private static final String SUFFIX = "-suffixed";
+
 	@Test
 	public void testUnaryOp() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
-			
+
 			DataSet<String> bcData = env.fromElements(SUFFIX);
-			
+
 			List<String> result = new ArrayList<String>();
-			
+
 			env.fromElements(TEST_DATA)
 					.map(new SuffixAppender()).withBroadcastSet(bcData, BC_VAR_NAME)
 					.output(new LocalCollectionOutputFormat<String>(result));
-			
+
 			env.execute();
-			
+
 			assertEquals(TEST_DATA.length, result.size());
 			for (String s : result) {
 				assertTrue(s.indexOf(SUFFIX) > 0);
@@ -64,22 +70,22 @@ public class CollectionExecutionWithBroadcastVariableTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testBinaryOp() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
-			
+
 			DataSet<String> bcData = env.fromElements(SUFFIX);
 			DataSet<String> inData = env.fromElements(TEST_DATA);
-			
+
 			List<String> result = new ArrayList<String>();
-			
+
 			inData.cross(inData).with(new SuffixCross()).withBroadcastSet(bcData, BC_VAR_NAME)
 					.output(new LocalCollectionOutputFormat<String>(result));
-			
+
 			env.execute();
-			
+
 			assertEquals(TEST_DATA.length * TEST_DATA.length, result.size());
 			for (String s : result) {
 				assertTrue(s.indexOf(SUFFIX) == 2);
@@ -90,31 +96,31 @@ public class CollectionExecutionWithBroadcastVariableTest {
 			fail(e.getMessage());
 		}
 	}
-	
-	public static final class SuffixAppender extends RichMapFunction<String, String> {
-		
+
+	private static final class SuffixAppender extends RichMapFunction<String, String> {
+
 		private String suffix;
-		
+
 		@Override
 		public void open(Configuration parameters) {
 			suffix = getRuntimeContext().<String>getBroadcastVariable(BC_VAR_NAME).get(0);
 		}
-		
+
 		@Override
 		public String map(String value) {
 			return value + suffix;
 		}
 	}
-	
-	public static final class SuffixCross extends RichCrossFunction<String, String, String> {
-		
+
+	private static final class SuffixCross extends RichCrossFunction<String, String, String> {
+
 		private String suffix;
-		
+
 		@Override
 		public void open(Configuration parameters) {
 			suffix = getRuntimeContext().<String>getBroadcastVariable(BC_VAR_NAME).get(0);
 		}
-		
+
 		@Override
 		public String cross(String s1, String s2) {
 			return s1 + s2 + suffix;

http://git-wip-us.apache.org/repos/asf/flink/blob/8624c290/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
index a4426e0..e1e393b 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
@@ -46,6 +46,9 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Future;
 
+/**
+ * Tests for {@link CoGroupOperatorBase} on collections.
+ */
 @SuppressWarnings("serial")
 public class CoGroupOperatorCollectionTest implements Serializable {
 
@@ -94,7 +97,7 @@ public class CoGroupOperatorCollectionTest implements Serializable {
 
 				Assert.assertTrue(udf1.isClosed);
 				Assert.assertTrue(udf2.isClosed);
-				
+
 				Set<Tuple2<String, Integer>> expected = new HashSet<Tuple2<String, Integer>>(
 						Arrays.asList(new Tuple2Builder<String, Integer>()
 										.add("foo", 8)

http://git-wip-us.apache.org/repos/asf/flink/blob/8624c290/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
index d0784a8..d788efd 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.util.Collector;
+
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -44,8 +45,13 @@ import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.util.Arrays.asList;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
+/**
+ * Tests for {@link GroupReduceFunction}.
+ */
 @SuppressWarnings({"serial", "unchecked"})
 public class GroupReduceOperatorTest implements java.io.Serializable {
 
@@ -58,7 +64,7 @@ public class GroupReduceOperatorTest implements java.io.Serializable {
 
 				@Override
 				public void reduce(Iterable<Tuple2<String, Integer>> values,
-				                   Collector<Tuple2<String, Integer>> out) throws Exception {
+									Collector<Tuple2<String, Integer>> out) throws Exception {
 					Iterator<Tuple2<String, Integer>> input = values.iterator();
 
 					Tuple2<String, Integer> result = input.next();
@@ -86,13 +92,12 @@ public class GroupReduceOperatorTest implements java.io.Serializable {
 					Integer>("foo", 3), new Tuple2<String, Integer>("bar", 2), new Tuple2<String,
 					Integer>("bar", 4)));
 
-
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			executionConfig.disableObjectReuse();
 			List<Tuple2<String, Integer>> resultMutableSafe = op.executeOnCollections(input, null, executionConfig);
 			executionConfig.enableObjectReuse();
 			List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input, null, executionConfig);
-			
+
 			Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<Tuple2<String, Integer>>(resultMutableSafe);
 			Set<Tuple2<String, Integer>> resultSetRegular = new HashSet<Tuple2<String, Integer>>(resultRegular);
 
@@ -120,10 +125,9 @@ public class GroupReduceOperatorTest implements java.io.Serializable {
 					Integer>> reducer = new RichGroupReduceFunction<Tuple2<String, Integer>,
 					Tuple2<String, Integer>>() {
 
-
 				@Override
 				public void reduce(Iterable<Tuple2<String, Integer>> values,
-				                   Collector<Tuple2<String, Integer>> out) throws Exception {
+									Collector<Tuple2<String, Integer>> out) throws Exception {
 					Iterator<Tuple2<String, Integer>> input = values.iterator();
 
 					Tuple2<String, Integer> result = input.next();
@@ -175,7 +179,7 @@ public class GroupReduceOperatorTest implements java.io.Serializable {
 							new HashMap<String, Accumulator<?, ?>>(),
 							new UnregisteredMetricsGroup()),
 					executionConfig);
-			
+
 			executionConfig.enableObjectReuse();
 			List<Tuple2<String, Integer>> resultRegular = op.executeOnCollections(input,
 					new RuntimeUDFContext(taskInfo, null, executionConfig,
@@ -183,8 +187,7 @@ public class GroupReduceOperatorTest implements java.io.Serializable {
 							new HashMap<String, Accumulator<?, ?>>(),
 							new UnregisteredMetricsGroup()),
 					executionConfig);
-			
-			
+
 			Set<Tuple2<String, Integer>> resultSetMutableSafe = new HashSet<Tuple2<String, Integer>>(resultMutableSafe);
 			Set<Tuple2<String, Integer>> resultSetRegular = new HashSet<Tuple2<String, Integer>>(resultRegular);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8624c290/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
index ef33ac0..23d0e65 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
@@ -45,15 +45,16 @@ import java.util.concurrent.Future;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
+/**
+ * Tests for {@link InnerJoinOperatorBase}.
+ */
 @SuppressWarnings({ "unchecked", "serial" })
 public class InnerJoinOperatorBaseTest implements Serializable {
 
-	
 	@Test
 	public void testTupleBaseJoiner(){
 		final FlatJoinFunction<Tuple3<String, Double, Integer>, Tuple2<Integer, String>, Tuple2<Double, String>> joiner =
-					new FlatJoinFunction<Tuple3<String, Double, Integer>, Tuple2<Integer, String>, Tuple2<Double, String>>()
-		{
+					new FlatJoinFunction<Tuple3<String, Double, Integer>, Tuple2<Integer, String>, Tuple2<Double, String>>() {
 			@Override
 			public void join(Tuple3<String, Double, Integer> first, Tuple2<Integer, String> second, Collector<Tuple2<Double, String>> out) {
 
@@ -71,8 +72,8 @@ public class InnerJoinOperatorBaseTest implements Serializable {
 		final TupleTypeInfo<Tuple2<Double, String>> outTypeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Double.class,
 				String.class);
 
-		final int[] leftKeys = new int[]{0,2};
-		final int[] rightKeys = new int[]{1,0};
+		final int[] leftKeys = new int[]{0, 2};
+		final int[] rightKeys = new int[]{1, 0};
 
 		final String taskName = "Collection based tuple joiner";
 
@@ -109,7 +110,7 @@ public class InnerJoinOperatorBaseTest implements Serializable {
 		try {
 			final TaskInfo taskInfo = new TaskInfo("op", 1, 0, 1, 0);
 			ExecutionConfig executionConfig = new ExecutionConfig();
-			
+
 			executionConfig.disableObjectReuse();
 			List<Tuple2<Double, String>> resultSafe = base.executeOnCollections(inputData1, inputData2,
 					new RuntimeUDFContext(taskInfo, null, executionConfig,
@@ -117,7 +118,7 @@ public class InnerJoinOperatorBaseTest implements Serializable {
 							new HashMap<String, Accumulator<?, ?>>(),
 							new UnregisteredMetricsGroup()),
 					executionConfig);
-			
+
 			executionConfig.enableObjectReuse();
 			List<Tuple2<Double, String>> resultRegular = base.executeOnCollections(inputData1, inputData2,
 					new RuntimeUDFContext(taskInfo, null, executionConfig,


Mime
View raw message