flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [11/23] flink git commit: [FLINK-6107] Enable Javadoc checks in streaming checkstyle
Date Wed, 26 Apr 2017 10:07:33 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 334851e..6104b35 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -85,20 +85,17 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission
  * is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
  *
- * <p>
- * The windows are conceptually evaluated for each key individually, meaning windows can trigger at
- * different points for each key.
+ * <p>The windows are conceptually evaluated for each key individually, meaning windows can trigger
+ * at different points for each key.
  *
- * <p>
- * If an {@link Evictor} is specified it will be used to evict elements from the window after
+ * <p>If an {@link Evictor} is specified it will be used to evict elements from the window after
  * evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window.
  * When using an evictor window performance will degrade significantly, since
  * incremental aggregation of window results cannot be used.
  *
- * <p>
- * Note that the {@code WindowedStream} is purely and API construct, during runtime
- * the {@code WindowedStream} will be collapsed together with the
- * {@code KeyedStream} and the operation over the window into one single operation.
+ * <p>Note that the {@code WindowedStream} is purely and API construct, during runtime the
+ * {@code WindowedStream} will be collapsed together with the {@code KeyedStream} and the operation
+ * over the window into one single operation.
  *
  * @param <T> The type of elements in the stream.
  * @param <K> The type of the key by which elements are grouped.
@@ -107,10 +104,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @Public
 public class WindowedStream<T, K, W extends Window> {
 
-	/** The keyed data stream that is windowed by this stream */
+	/** The keyed data stream that is windowed by this stream. */
 	private final KeyedStream<T, K> input;
 
-	/** The window assigner */
+	/** The window assigner. */
 	private final WindowAssigner<? super T, W> windowAssigner;
 
 	/** The trigger that is used for window evaluation/emission. */
@@ -189,8 +186,7 @@ public class WindowedStream<T, K, W extends Window> {
 	/**
 	 * Sets the {@code Evictor} that should be used to evict elements from a window before emission.
 	 *
-	 * <p>
-	 * Note: When using an evictor window performance will degrade significantly, since
+	 * <p>Note: When using an evictor window performance will degrade significantly, since
 	 * incremental aggregation of window results cannot be used.
 	 */
 	@PublicEvolving
@@ -216,11 +212,10 @@ public class WindowedStream<T, K, W extends Window> {
 	 * of the window for each key individually. The output of the reduce function is interpreted
 	 * as a regular non-windowed stream.
 	 *
-	 * <p>
-	 * This window will try and incrementally aggregate data as much as the window policies permit.
-	 * For example, tumbling time windows can aggregate the data, meaning that only one element per
-	 * key is stored. Sliding time windows will aggregate on the granularity of the slide interval,
-	 * so a few elements are stored per key (one per slide interval).
+	 * <p>This window will try and incrementally aggregate data as much as the window policies
+	 * permit. For example, tumbling time windows can aggregate the data, meaning that only one
+	 * element per key is stored. Sliding time windows will aggregate on the granularity of the
+	 * slide interval, so a few elements are stored per key (one per slide interval).
 	 * Custom windows may not be able to incrementally aggregate, or may need to store extra values
 	 * in an aggregation tree.
 	 *
@@ -254,8 +249,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * evaluation of the window for each key individually. The output of the window function is
 	 * interpreted as a regular non-windowed stream.
 	 *
-	 * <p>
-	 * Arriving data is incrementally aggregated using the given reducer.
+	 * <p>Arriving data is incrementally aggregated using the given reducer.
 	 *
 	 * @param reduceFunction The reduce function that is used for incremental aggregation.
 	 * @param function The window function.
@@ -271,8 +265,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * evaluation of the window for each key individually. The output of the window function is
 	 * interpreted as a regular non-windowed stream.
 	 *
-	 * <p>
-	 * Arriving data is incrementally aggregated using the given reducer.
+	 * <p>Arriving data is incrementally aggregated using the given reducer.
 	 *
 	 * @param reduceFunction The reduce function that is used for incremental aggregation.
 	 * @param function The window function.
@@ -292,8 +285,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * evaluation of the window for each key individually. The output of the window function is
 	 * interpreted as a regular non-windowed stream.
 	 *
-	 * <p>
-	 * Arriving data is incrementally aggregated using the given reducer.
+	 * <p>Arriving data is incrementally aggregated using the given reducer.
 	 *
 	 * @param reduceFunction The reduce function that is used for incremental aggregation.
 	 * @param function The window function.
@@ -318,8 +310,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * evaluation of the window for each key individually. The output of the window function is
 	 * interpreted as a regular non-windowed stream.
 	 *
-	 * <p>
-	 * Arriving data is incrementally aggregated using the given reducer.
+	 * <p>Arriving data is incrementally aggregated using the given reducer.
 	 *
 	 * @param reduceFunction The reduce function that is used for incremental aggregation.
 	 * @param function The window function.
@@ -535,8 +526,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * evaluation of the window for each key individually. The output of the window function is
 	 * interpreted as a regular non-windowed stream.
 	 *
-	 * <p>
-	 * Arriving data is incrementally aggregated using the given fold function.
+	 * <p>Arriving data is incrementally aggregated using the given fold function.
 	 *
 	 * @param initialValue The initial value of the fold.
 	 * @param foldFunction The fold function that is used for incremental aggregation.
@@ -560,8 +550,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * evaluation of the window for each key individually. The output of the window function is
 	 * interpreted as a regular non-windowed stream.
 	 *
-	 * <p>
-	 * Arriving data is incrementally aggregated using the given fold function.
+	 * <p>Arriving data is incrementally aggregated using the given fold function.
 	 *
 	 * @param initialValue The initial value of the fold.
 	 * @param foldFunction The fold function that is used for incremental aggregation.
@@ -1084,8 +1073,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * evaluation of the window for each key individually. The output of the window function is
 	 * interpreted as a regular non-windowed stream.
 	 *
-	 * <p>
-	 * Not that this function requires that all data in the windows is buffered until the window
+	 * <p>Not that this function requires that all data in the windows is buffered until the window
 	 * is evaluated, as the function provides no means of incremental aggregation.
 	 *
 	 * @param function The window function.
@@ -1103,8 +1091,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * evaluation of the window for each key individually. The output of the window function is
 	 * interpreted as a regular non-windowed stream.
 	 *
-	 * <p>
-	 * Note that this function requires that all data in the windows is buffered until the window
+	 * <p>Note that this function requires that all data in the windows is buffered until the window
 	 * is evaluated, as the function provides no means of incremental aggregation.
 	 *
 	 * @param function The window function.
@@ -1122,8 +1109,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * evaluation of the window for each key individually. The output of the window function is
 	 * interpreted as a regular non-windowed stream.
 	 *
-	 * <p>
-	 * Not that this function requires that all data in the windows is buffered until the window
+	 * <p>Not that this function requires that all data in the windows is buffered until the window
 	 * is evaluated, as the function provides no means of incremental aggregation.
 	 *
 	 * @param function The window function.
@@ -1142,8 +1128,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * evaluation of the window for each key individually. The output of the window function is
 	 * interpreted as a regular non-windowed stream.
 	 *
-	 * <p>
-	 * Not that this function requires that all data in the windows is buffered until the window
+	 * <p>Not that this function requires that all data in the windows is buffered until the window
 	 * is evaluated, as the function provides no means of incremental aggregation.
 	 *
 	 * @param function The window function.
@@ -1221,8 +1206,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * evaluation of the window for each key individually. The output of the window function is
 	 * interpreted as a regular non-windowed stream.
 	 *
-	 * <p>
-	 * Arriving data is incrementally aggregated using the given reducer.
+	 * <p>Arriving data is incrementally aggregated using the given reducer.
 	 *
 	 * @param reduceFunction The reduce function that is used for incremental aggregation.
 	 * @param function The window function.
@@ -1244,8 +1228,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * evaluation of the window for each key individually. The output of the window function is
 	 * interpreted as a regular non-windowed stream.
 	 *
-	 * <p>
-	 * Arriving data is incrementally aggregated using the given reducer.
+	 * <p>Arriving data is incrementally aggregated using the given reducer.
 	 *
 	 * @param reduceFunction The reduce function that is used for incremental aggregation.
 	 * @param function The window function.
@@ -1321,8 +1304,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * evaluation of the window for each key individually. The output of the window function is
 	 * interpreted as a regular non-windowed stream.
 	 *
-	 * <p>
-	 * Arriving data is incrementally aggregated using the given fold function.
+	 * <p>Arriving data is incrementally aggregated using the given fold function.
 	 *
 	 * @param initialValue The initial value of the fold.
 	 * @param foldFunction The fold function that is used for incremental aggregation.
@@ -1345,8 +1327,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * evaluation of the window for each key individually. The output of the window function is
 	 * interpreted as a regular non-windowed stream.
 	 *
-	 * <p>
-	 * Arriving data is incrementally aggregated using the given fold function.
+	 * <p>Arriving data is incrementally aggregated using the given fold function.
 	 *
 	 * @param initialValue The initial value of the fold.
 	 * @param foldFunction The fold function that is used for incremental aggregation.
@@ -1434,13 +1415,11 @@ public class WindowedStream<T, K, W extends Window> {
 	}
 
 	/**
-	 * Applies an aggregation that sums every window of the pojo data stream at
-	 * the given field for every window.
+	 * Applies an aggregation that sums every window of the pojo data stream at the given field for
+	 * every window.
 	 *
-	 * <p>
-	 * A field expression is either
-	 * the name of a public field or a getter method with parentheses of the
-	 * stream's underlying type. A dot can be used to drill down into objects,
+	 * <p>A field expression is either the name of a public field or a getter method with
+	 * parentheses of the stream's underlying type. A dot can be used to drill down into objects,
 	 * as in {@code "field1.getInnerField2()" }.
 	 *
 	 * @param field The field to sum
@@ -1465,9 +1444,7 @@ public class WindowedStream<T, K, W extends Window> {
 	 * Applies an aggregation that that gives the minimum value of the pojo data
 	 * stream at the given field expression for every window.
 	 *
-	 * <p>
-	 * A field
-	 * expression is either the name of a public field or a getter method with
+	 * <p>A field * expression is either the name of a public field or a getter method with
 	 * parentheses of the {@link DataStream}S underlying type. A dot can be used
 	 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index eb7833a..1cb48a2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -35,16 +35,16 @@ public class CheckpointConfig implements java.io.Serializable {
 
 	private static final long serialVersionUID = -750378776078908147L;
 
-	/** The default checkpoint mode: exactly once */
+	/** The default checkpoint mode: exactly once. */
 	public static final CheckpointingMode DEFAULT_MODE = CheckpointingMode.EXACTLY_ONCE;
 
-	/** The default timeout of a checkpoint attempt: 10 minutes */
+	/** The default timeout of a checkpoint attempt: 10 minutes. */
 	public static final long DEFAULT_TIMEOUT = 10 * 60 * 1000;
 
-	/** The default minimum pause to be made between checkpoints: none */
+	/** The default minimum pause to be made between checkpoints: none. */
 	public static final long DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS = 0;
 
-	/** The default limit of concurrently happening checkpoints: one */
+	/** The default limit of concurrently happening checkpoints: one. */
 	public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS = 1;
 
 	// ------------------------------------------------------------------------
@@ -52,19 +52,19 @@ public class CheckpointConfig implements java.io.Serializable {
 	/** Checkpointing mode (exactly-once vs. at-least-once). */
 	private CheckpointingMode checkpointingMode = DEFAULT_MODE;
 
-	/** Periodic checkpoint triggering interval */
+	/** Periodic checkpoint triggering interval. */
 	private long checkpointInterval = -1; // disabled
 
-	/** Maximum time checkpoint may take before being discarded */
+	/** Maximum time checkpoint may take before being discarded. */
 	private long checkpointTimeout = DEFAULT_TIMEOUT;
 
-	/** Minimal pause between checkpointing attempts */
+	/** Minimal pause between checkpointing attempts. */
 	private long minPauseBetweenCheckpoints = DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS;
 
-	/** Maximum number of checkpoint attempts in progress at the same time */
+	/** Maximum number of checkpoint attempts in progress at the same time. */
 	private int maxConcurrentCheckpoints = DEFAULT_MAX_CONCURRENT_CHECKPOINTS;
 
-	/** Flag to force checkpointing in iterative jobs */
+	/** Flag to force checkpointing in iterative jobs. */
 	private boolean forceCheckpointing;
 
 	/** Cleanup behaviour for persistent checkpoints. */

http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
index 61ca55f..0a4209f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
@@ -46,7 +46,7 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
 
 	private static final Logger LOG = LoggerFactory.getLogger(Flip6LocalStreamEnvironment.class);
 
-	/** The configuration to use for the mini cluster */
+	/** The configuration to use for the mini cluster. */
 	private final Configuration conf;
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 566beba..5849559 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -48,7 +48,7 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 
 	private static final Logger LOG = LoggerFactory.getLogger(LocalStreamEnvironment.class);
 	
-	/** The configuration to use for the local cluster */
+	/** The configuration to use for the local cluster. */
 	private final Configuration conf;
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 333f9c0..28a0675 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -41,24 +41,27 @@ import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * A {@link StreamExecutionEnvironment} for executing on a cluster.
+ */
 @Public
 public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	
 	private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamEnvironment.class);
 
-	/** The hostname of the JobManager */
+	/** The hostname of the JobManager. */
 	private final String host;
 
-	/** The port of the JobManager main actor system */
+	/** The port of the JobManager main actor system. */
 	private final int port;
 
-	/** The configuration used to parametrize the client that connects to the remote cluster */
+	/** The configuration used to parametrize the client that connects to the remote cluster. */
 	private final Configuration clientConfiguration;
 
-	/** The jar files that need to be attached to each job */
+	/** The jar files that need to be attached to each job. */
 	private final List<URL> jarFiles;
 	
-	/** The classpaths that need to be attached to each job */
+	/** The classpaths that need to be attached to each job. */
 	private final List<URL> globalClasspaths;
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index 49c5347..52d2c4b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -30,6 +30,11 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Special {@link StreamExecutionEnvironment} that will be used in cases where the CLI client or
+ * testing utilities create a {@link StreamExecutionEnvironment} that should be used when
+ * {@link StreamExecutionEnvironment#getExecutionEnvironment()} is called.
+ */
 @PublicEvolving
 public class StreamContextEnvironment extends StreamExecutionEnvironment {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index e836616..354eb4a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -100,27 +100,29 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @Public
 public abstract class StreamExecutionEnvironment {
 
-	/** The default name to use for a streaming job if no other name has been specified */
+	/** The default name to use for a streaming job if no other name has been specified. */
 	public static final String DEFAULT_JOB_NAME = "Flink Streaming Job";
 
-	/** The time characteristic that is used if none other is set */
+	/** The time characteristic that is used if none other is set. */
 	private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;
 
-	/** The default buffer timeout (max delay of records in the network stack) */
+	/** The default buffer timeout (max delay of records in the network stack). */
 	private static final long DEFAULT_NETWORK_BUFFER_TIMEOUT = 100L;
 
-	/** The environment of the context (local by default, cluster if invoked through command line) */
+	/**
+	 * The environment of the context (local by default, cluster if invoked through command line).
+	 */
 	private static StreamExecutionEnvironmentFactory contextEnvironmentFactory;
 
-	/** The default parallelism used when creating a local environment */
+	/** The default parallelism used when creating a local environment. */
 	private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
 
 	// ------------------------------------------------------------------------
 
-	/** The execution configuration for this environment */
+	/** The execution configuration for this environment. */
 	private final ExecutionConfig config = new ExecutionConfig();
 
-	/** Settings that control the checkpointing behavior */
+	/** Settings that control the checkpointing behavior. */
 	private final CheckpointConfig checkpointCfg = new CheckpointConfig();
 
 	protected final List<StreamTransformation<?>> transformations = new ArrayList<>();
@@ -129,10 +131,10 @@ public abstract class StreamExecutionEnvironment {
 
 	protected boolean isChainingEnabled = true;
 
-	/** The state backend used for storing k/v state and state snapshots */
+	/** The state backend used for storing k/v state and state snapshots. */
 	private AbstractStateBackend defaultStateBackend;
 
-	/** The time characteristic used by the data streams */
+	/** The time characteristic used by the data streams. */
 	private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;
 
 
@@ -168,12 +170,14 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
-	 * Sets the maximum degree of parallelism defined for the program. The upper limit (inclusive) is Short.MAX_VALUE.
+	 * Sets the maximum degree of parallelism defined for the program. The upper limit (inclusive)
+	 * is Short.MAX_VALUE.
 	 *
-	 * The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also
+	 * <p>The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also
 	 * defines the number of key groups used for partitioned state.
 	 *
-	 * @param maxParallelism Maximum degree of parallelism to be used for the program., with 0 < maxParallelism <= 2^15 - 1
+	 * @param maxParallelism Maximum degree of parallelism to be used for the program.,
+	 *              with 0 < maxParallelism <= 2^15 - 1
 	 */
 	public StreamExecutionEnvironment setMaxParallelism(int maxParallelism) {
 		Preconditions.checkArgument(maxParallelism > 0 &&
@@ -200,7 +204,7 @@ public abstract class StreamExecutionEnvironment {
 	/**
 	 * Gets the maximum degree of parallelism defined for the program.
 	 *
-	 * The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also
+	 * <p>The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also
 	 * defines the number of key groups used for partitioned state.
 	 *
 	 * @return Maximum degree of parallelism
@@ -214,15 +218,12 @@ public abstract class StreamExecutionEnvironment {
 	 * output buffers. By default the output buffers flush frequently to provide
 	 * low latency and to aid smooth developer experience. Setting the parameter
 	 * can result in three logical modes:
-	 * <p>
+	 *
 	 * <ul>
-	 * <li>
-	 * A positive integer triggers flushing periodically by that integer</li>
-	 * <li>
-	 * 0 triggers flushing after every record thus minimizing latency</li>
-	 * <li>
-	 * -1 triggers flushing only when the output buffer is full thus maximizing
-	 * throughput</li>
+	 *   <li>A positive integer triggers flushing periodically by that integer</li>
+	 *   <li>0 triggers flushing after every record thus minimizing latency</li>
+	 *   <li>-1 triggers flushing only when the output buffer is full thus maximizing
+	 *      throughput</li>
 	 * </ul>
 	 *
 	 * @param timeoutMillis
@@ -292,12 +293,12 @@ public abstract class StreamExecutionEnvironment {
 	 * {@link CheckpointingMode#EXACTLY_ONCE} guarantees.
 	 *
 	 * <p>The job draws checkpoints periodically, in the given interval. The state will be
-	 * stored in the configured state backend.</p>
+	 * stored in the configured state backend.
 	 *
 	 * <p>NOTE: Checkpointing iterative streaming dataflows in not properly supported at
 	 * the moment. For that reason, iterative jobs will not be started if used
 	 * with enabled checkpointing. To override this mechanism, use the
-	 * {@link #enableCheckpointing(long, CheckpointingMode, boolean)} method.</p>
+	 * {@link #enableCheckpointing(long, CheckpointingMode, boolean)} method.
 	 *
 	 * @param interval Time interval between state checkpoints in milliseconds.
 	 */
@@ -313,12 +314,12 @@ public abstract class StreamExecutionEnvironment {
 	 *
 	 * <p>The job draws checkpoints periodically, in the given interval. The system uses the
 	 * given {@link CheckpointingMode} for the checkpointing ("exactly once" vs "at least once").
-	 * The state will be stored in the configured state backend.</p>
+	 * The state will be stored in the configured state backend.
 	 *
 	 * <p>NOTE: Checkpointing iterative streaming dataflows in not properly supported at
 	 * the moment. For that reason, iterative jobs will not be started if used
 	 * with enabled checkpointing. To override this mechanism, use the
-	 * {@link #enableCheckpointing(long, CheckpointingMode, boolean)} method.</p>
+	 * {@link #enableCheckpointing(long, CheckpointingMode, boolean)} method.
 	 *
 	 * @param interval
 	 *             Time interval between state checkpoints in milliseconds.
@@ -337,11 +338,11 @@ public abstract class StreamExecutionEnvironment {
 	 * dataflow will be restarted from the latest completed checkpoint.
 	 *
 	 * <p>The job draws checkpoints periodically, in the given interval. The state will be
-	 * stored in the configured state backend.</p>
+	 * stored in the configured state backend.
 	 *
 	 * <p>NOTE: Checkpointing iterative streaming dataflows in not properly supported at
 	 * the moment. If the "force" parameter is set to true, the system will execute the
-	 * job nonetheless.</p>
+	 * job nonetheless.
 	 *
 	 * @param interval
 	 *            Time interval between state checkpoints in millis.
@@ -370,12 +371,12 @@ public abstract class StreamExecutionEnvironment {
 	 * {@link CheckpointingMode#EXACTLY_ONCE} guarantees.
 	 *
 	 * <p>The job draws checkpoints periodically, in the default interval. The state will be
-	 * stored in the configured state backend.</p>
+	 * stored in the configured state backend.
 	 *
 	 * <p>NOTE: Checkpointing iterative streaming dataflows in not properly supported at
 	 * the moment. For that reason, iterative jobs will not be started if used
 	 * with enabled checkpointing. To override this mechanism, use the
-	 * {@link #enableCheckpointing(long, CheckpointingMode, boolean)} method.</p>
+	 * {@link #enableCheckpointing(long, CheckpointingMode, boolean)} method.
 	 *
 	 * @deprecated Use {@link #enableCheckpointing(long)} instead.
 	 */
@@ -520,8 +521,8 @@ public abstract class StreamExecutionEnvironment {
 
 	/**
 	 * Adds a new Kryo default serializer to the Runtime.
-	 * <p>
-	 * Note that the serializer instance must be serializable (as defined by
+	 *
+	 * <p>Note that the serializer instance must be serializable (as defined by
 	 * java.io.Serializable), because it may be distributed to the worker nodes
 	 * by java serialization.
 	 *
@@ -548,8 +549,8 @@ public abstract class StreamExecutionEnvironment {
 
 	/**
 	 * Registers the given type with a Kryo Serializer.
-	 * <p>
-	 * Note that the serializer instance must be serializable (as defined by
+	 *
+	 * <p>Note that the serializer instance must be serializable (as defined by
 	 * java.io.Serializable), because it may be distributed to the worker nodes
 	 * by java serialization.
 	 *
@@ -564,7 +565,7 @@ public abstract class StreamExecutionEnvironment {
 
 	/**
 	 * Registers the given Serializer via its class as a serializer for the
-	 * given type at the KryoSerializer
+	 * given type at the KryoSerializer.
 	 *
 	 * @param type
 	 * 		The class of the types serialized with the given serializer.
@@ -607,8 +608,7 @@ public abstract class StreamExecutionEnvironment {
 	 * Sets the time characteristic for all streams create from this environment, e.g., processing
 	 * time, event time, or ingestion time.
 	 *
-	 * <p>
-	 * If you set the characteristic to IngestionTime of EventTime this will set a default
+	 * <p>If you set the characteristic to IngestionTime of EventTime this will set a default
 	 * watermark update interval of 200 ms. If this is not applicable for your application
 	 * you should change it using {@link ExecutionConfig#setAutoWatermarkInterval(long)}.
 	 *
@@ -660,15 +660,15 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
-	 * Creates a new data stream that contains the given elements. The elements must all be of the same type, for
-	 * example, all of the {@link String} or {@link Integer}.
-	 * <p>
-	 * The framework will try and determine the exact type from the elements. In case of generic elements, it may be
-	 * necessary to manually supply the type information via {@link #fromCollection(java.util.Collection,
-	 * org.apache.flink.api.common.typeinfo.TypeInformation)}.
-	 * <p>
-	 * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
-	 * degree of parallelism one.
+	 * Creates a new data stream that contains the given elements. The elements must all be of the
+	 * same type, for example, all of the {@link String} or {@link Integer}.
+	 *
+	 * <p>The framework will try and determine the exact type from the elements. In case of generic
+	 * elements, it may be necessary to manually supply the type information via
+	 * {@link #fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)}.
+	 *
+	 * <p>Note that this operation will result in a non-parallel data stream source, i.e. a data
+	 * stream source with a degree of parallelism one.
 	 *
 	 * @param data
 	 * 		The array of elements to create the data stream from.
@@ -733,10 +733,10 @@ public abstract class StreamExecutionEnvironment {
 	 *
 	 * <p>The framework will try and determine the exact type from the collection elements. In case of generic
 	 * elements, it may be necessary to manually supply the type information via
-	 * {@link #fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)}.</p>
+	 * {@link #fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)}.
 	 *
 	 * <p>Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
-	 * parallelism one.</p>
+	 * parallelism one.
 	 *
 	 * @param data
 	 * 		The collection of elements to create the data stream from.
@@ -772,7 +772,7 @@ public abstract class StreamExecutionEnvironment {
 	 * Creates a data stream from the given non-empty collection.
 	 *
 	 * <p>Note that this operation will result in a non-parallel data stream source,
-	 * i.e., a data stream source with a parallelism one.</p>
+	 * i.e., a data stream source with a parallelism one.
 	 *
 	 * @param data
 	 * 		The collection of elements to create the data stream from
@@ -803,10 +803,10 @@ public abstract class StreamExecutionEnvironment {
 	 *
 	 * <p>Because the iterator will remain unmodified until the actual execution happens,
 	 * the type of data returned by the iterator must be given explicitly in the form of the type
-	 * class (this is due to the fact that the Java compiler erases the generic type information).</p>
+	 * class (this is due to the fact that the Java compiler erases the generic type information).
 	 *
 	 * <p>Note that this operation will result in a non-parallel data stream source, i.e.,
-	 * a data stream source with a parallelism of one.</p>
+	 * a data stream source with a parallelism of one.
 	 *
 	 * @param data
 	 * 		The iterator of elements to create the data stream from
@@ -828,10 +828,10 @@ public abstract class StreamExecutionEnvironment {
 	 * the type of data returned by the iterator must be given explicitly in the form of the type
 	 * information. This method is useful for cases where the type is generic.
 	 * In that case, the type class (as given in
-	 * {@link #fromCollection(java.util.Iterator, Class)} does not supply all type information.</p>
+	 * {@link #fromCollection(java.util.Iterator, Class)} does not supply all type information.
 	 *
 	 * <p>Note that this operation will result in a non-parallel data stream source, i.e.,
-	 * a data stream source with a parallelism one.</p>
+	 * a data stream source with a parallelism one.
 	 *
 	 * @param data
 	 * 		The iterator of elements to create the data stream from
@@ -849,12 +849,13 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
-	 * Creates a new data stream that contains elements in the iterator. The iterator is splittable, allowing the
-	 * framework to create a parallel data stream source that returns the elements in the iterator.
+	 * Creates a new data stream that contains elements in the iterator. The iterator is splittable,
+	 * allowing the framework to create a parallel data stream source that returns the elements in
+	 * the iterator.
 	 *
-	 * <p>Because the iterator will remain unmodified until the actual execution happens, the type of data returned by the
-	 * iterator must be given explicitly in the form of the type class (this is due to the fact that the Java compiler
-	 * erases the generic type information).</p>
+	 * <p>Because the iterator will remain unmodified until the actual execution happens, the type
+	 * of data returned by the iterator must be given explicitly in the form of the type class
+	 * (this is due to the fact that the Java compiler erases the generic type information).
 	 *
 	 * @param iterator
 	 * 		The iterator that produces the elements of the data stream
@@ -869,13 +870,16 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
-	 * Creates a new data stream that contains elements in the iterator. The iterator is splittable, allowing the
-	 * framework to create a parallel data stream source that returns the elements in the iterator.
-	 * <p>
-	 * Because the iterator will remain unmodified until the actual execution happens, the type of data returned by the
-	 * iterator must be given explicitly in the form of the type information. This method is useful for cases where the
-	 * type is generic. In that case, the type class (as given in {@link #fromParallelCollection(org.apache.flink.util.SplittableIterator,
-	 * Class)} does not supply all type information.
+	 * Creates a new data stream that contains elements in the iterator. The iterator is splittable,
+	 * allowing the framework to create a parallel data stream source that returns the elements in
+	 * the iterator.
+	 *
+	 * <p>Because the iterator will remain unmodified until the actual execution happens, the type
+	 * of data returned by the iterator must be given explicitly in the form of the type
+	 * information. This method is useful for cases where the type is generic. In that case, the
+	 * type class (as given in
+	 * {@link #fromParallelCollection(org.apache.flink.util.SplittableIterator, Class)} does not
+	 * supply all type information.
 	 *
 	 * @param iterator
 	 * 		The iterator that produces the elements of the data stream
@@ -897,15 +901,15 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
-	 * Reads the given file line-by-line and creates a data stream that contains a string with the contents of each such
-	 * line. The file will be read with the system's default character set.
+	 * Reads the given file line-by-line and creates a data stream that contains a string with the
+	 * contents of each such line. The file will be read with the system's default character set.
 	 *
-	 * <p>
-	 * <b> NOTES ON CHECKPOINTING: </b> The source monitors the path, creates the
-	 * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed,
-	 * forwards them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data,
-	 * and exits, without waiting for the readers to finish reading. This implies that no more checkpoint
-	 * barriers are going to be forwarded after the source exits, thus having no checkpoints after that point.
+	 * <p><b>NOTES ON CHECKPOINTING: </b> The source monitors the path, creates the
+	 * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed, forwards
+	 * them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data,
+	 * and exits, without waiting for the readers to finish reading. This implies that no more
+	 * checkpoint barriers are going to be forwarded after the source exits, thus having no
+	 * checkpoints after that point.
 	 *
 	 * @param filePath
 	 * 		The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
@@ -916,11 +920,11 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
-	 * Reads the given file line-by-line and creates a data stream that contains a string with the contents of each such
-	 * line. The {@link java.nio.charset.Charset} with the given name will be used to read the files.
+	 * Reads the given file line-by-line and creates a data stream that contains a string with the
+	 * contents of each such line. The {@link java.nio.charset.Charset} with the given name will be
+	 * used to read the files.
 	 *
-	 * <p>
-	 * <b> NOTES ON CHECKPOINTING: </b> The source monitors the path, creates the
+	 * <p><b>NOTES ON CHECKPOINTING: </b> The source monitors the path, creates the
 	 * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed,
 	 * forwards them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data,
 	 * and exits, without waiting for the readers to finish reading. This implies that no more checkpoint
@@ -947,16 +951,14 @@ public abstract class StreamExecutionEnvironment {
 	/**
 	 * Reads the contents of the user-specified {@code filePath} based on the given {@link FileInputFormat}.
 	 *
-	 * <p>
-	 * Since all data streams need specific information about their types, this method needs to determine the
+	 * <p>Since all data streams need specific information about their types, this method needs to determine the
 	 * type of the data produced by the input format. It will attempt to determine the data type by reflection,
 	 * unless the input format implements the {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
 	 * In the latter case, this method will invoke the
 	 * {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable#getProducedType()} method to determine data
 	 * type produced by the input format.
 	 *
-	 * <p>
-	 * <b> NOTES ON CHECKPOINTING: </b> The source monitors the path, creates the
+	 * <p><b>NOTES ON CHECKPOINTING: </b> The source monitors the path, creates the
 	 * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed,
 	 * forwards them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data,
 	 * and exits, without waiting for the readers to finish reading. This implies that no more checkpoint
@@ -976,11 +978,10 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
-	 *
 	 * Reads the contents of the user-specified {@code filePath} based on the given {@link FileInputFormat}. Depending
 	 * on the provided {@link FileProcessingMode}.
-	 * <p>
-	 * See {@link #readFile(FileInputFormat, String, FileProcessingMode, long)}
+	 *
+	 * <p>See {@link #readFile(FileInputFormat, String, FileProcessingMode, long)}
 	 *
 	 * @param inputFormat
 	 * 		The input format used to create the data stream
@@ -1028,16 +1029,14 @@ public abstract class StreamExecutionEnvironment {
 	 * can specify a custom {@link FilePathFilter}. As a default implementation you can use
 	 * {@link FilePathFilter#createDefaultFilter()}.
 	 *
-	 * <p>
-	 * Since all data streams need specific information about their types, this method needs to determine the
+	 * <p>Since all data streams need specific information about their types, this method needs to determine the
 	 * type of the data produced by the input format. It will attempt to determine the data type by reflection,
 	 * unless the input format implements the {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
 	 * In the latter case, this method will invoke the
 	 * {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable#getProducedType()} method to determine data
 	 * type produced by the input format.
 	 *
-	 * <p>
-	 * <b> NOTES ON CHECKPOINTING: </b> If the {@code watchType} is set to {@link FileProcessingMode#PROCESS_ONCE},
+	 * <p><b>NOTES ON CHECKPOINTING: </b> If the {@code watchType} is set to {@link FileProcessingMode#PROCESS_ONCE},
 	 * the source monitors the path <b>once</b>, creates the {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits}
 	 * to be processed, forwards them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data,
 	 * and exits, without waiting for the readers to finish reading. This implies that no more checkpoint barriers
@@ -1108,8 +1107,7 @@ public abstract class StreamExecutionEnvironment {
 	 * the user can specify a custom {@link FilePathFilter}. As a default implementation you can use
 	 * {@link FilePathFilter#createDefaultFilter()}.
 	 *
-	 * <p>
-	 *  <b> NOTES ON CHECKPOINTING: </b> If the {@code watchType} is set to {@link FileProcessingMode#PROCESS_ONCE},
+	 * <p><b>NOTES ON CHECKPOINTING: </b> If the {@code watchType} is set to {@link FileProcessingMode#PROCESS_ONCE},
 	 * the source monitors the path <b>once</b>, creates the {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits}
 	 * to be processed, forwards them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data,
 	 * and exits, without waiting for the readers to finish reading. This implies that no more checkpoint barriers
@@ -1148,8 +1146,8 @@ public abstract class StreamExecutionEnvironment {
 	 * Creates a new data stream that contains the strings received infinitely from a socket. Received strings are
 	 * decoded by the system's default character set. On the termination of the socket server connection retries can be
 	 * initiated.
-	 * <p>
-	 * Let us note that the socket itself does not report on abort and as a consequence retries are only initiated when
+	 *
+	 * <p>Let us note that the socket itself does not report on abort and as a consequence retries are only initiated when
 	 * the socket was gracefully terminated.
 	 *
 	 * @param hostname
@@ -1177,8 +1175,8 @@ public abstract class StreamExecutionEnvironment {
 	 * Creates a new data stream that contains the strings received infinitely from a socket. Received strings are
 	 * decoded by the system's default character set. On the termination of the socket server connection retries can be
 	 * initiated.
-	 * <p>
-	 * Let us note that the socket itself does not report on abort and as a consequence retries are only initiated when
+	 *
+	 * <p>Let us note that the socket itself does not report on abort and as a consequence retries are only initiated when
 	 * the socket was gracefully terminated.
 	 *
 	 * @param hostname
@@ -1259,16 +1257,15 @@ public abstract class StreamExecutionEnvironment {
 
 	/**
 	 * Generic method to create an input data stream with {@link org.apache.flink.api.common.io.InputFormat}.
-	 * <p>
-	 * Since all data streams need specific information about their types, this method needs to determine the
+	 *
+	 * <p>Since all data streams need specific information about their types, this method needs to determine the
 	 * type of the data produced by the input format. It will attempt to determine the data type by reflection,
 	 * unless the input format implements the {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
 	 * In the latter case, this method will invoke the
 	 * {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable#getProducedType()} method to determine data
 	 * type produced by the input format.
 	 *
-	 * <p>
-	 * <b> NOTES ON CHECKPOINTING: </b> In the case of a {@link FileInputFormat}, the source
+	 * <p><b>NOTES ON CHECKPOINTING: </b> In the case of a {@link FileInputFormat}, the source
 	 * (which executes the {@link ContinuousFileMonitoringFunction}) monitors the path, creates the
 	 * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed, forwards
 	 * them to the downstream {@link ContinuousFileReaderOperator} to read the actual data, and exits,
@@ -1288,13 +1285,12 @@ public abstract class StreamExecutionEnvironment {
 
 	/**
 	 * Generic method to create an input data stream with {@link org.apache.flink.api.common.io.InputFormat}.
-	 * <p>
-	 * The data stream is typed to the given TypeInformation. This method is intended for input formats
+	 *
+	 * <p>The data stream is typed to the given TypeInformation. This method is intended for input formats
 	 * where the return type cannot be determined by reflection analysis, and that do not implement the
 	 * {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
 	 *
-	 * <p>
-	 * <b> NOTES ON CHECKPOINTING: </b> In the case of a {@link FileInputFormat}, the source
+	 * <p><b>NOTES ON CHECKPOINTING: </b> In the case of a {@link FileInputFormat}, the source
 	 * (which executes the {@link ContinuousFileMonitoringFunction}) monitors the path, creates the
 	 * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed, forwards
 	 * them to the downstream {@link ContinuousFileReaderOperator} to read the actual data, and exits,
@@ -1364,8 +1360,7 @@ public abstract class StreamExecutionEnvironment {
 	/**
 	 * Adds a Data Source to the streaming topology.
 	 *
-	 * <p>
-	 * By default sources have a parallelism of 1. To enable parallel execution, the user defined source should
+	 * <p>By default sources have a parallelism of 1. To enable parallel execution, the user defined source should
 	 * implement {@link org.apache.flink.streaming.api.functions.source.ParallelSourceFunction} or extend {@link
 	 * org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction}. In these cases the resulting source
 	 * will have the parallelism of the environment. To change this afterwards call {@link
@@ -1466,7 +1461,7 @@ public abstract class StreamExecutionEnvironment {
 	/**
 	 * Casts the source function into a SourceFunction implementing the StoppableFunction.
 	 *
-	 * This method should only be used if the source function was checked to implement the
+	 * <p>This method should only be used if the source function was checked to implement the
 	 * {@link StoppableFunction} interface.
 	 *
 	 * @param sourceFunction Source function to cast
@@ -1483,8 +1478,8 @@ public abstract class StreamExecutionEnvironment {
 	 * Triggers the program execution. The environment will execute all parts of
 	 * the program that have resulted in a "sink" operation. Sink operations are
 	 * for example printing results or forwarding them to a message queue.
-	 * <p>
-	 * The program execution will be logged and displayed with a generated
+	 *
+	 * <p>The program execution will be logged and displayed with a generated
 	 * default name.
 	 *
 	 * @return The result of the job execution, containing elapsed time and accumulators.
@@ -1498,8 +1493,8 @@ public abstract class StreamExecutionEnvironment {
 	 * Triggers the program execution. The environment will execute all parts of
 	 * the program that have resulted in a "sink" operation. Sink operations are
 	 * for example printing results or forwarding them to a message queue.
-	 * <p>
-	 * The program execution will be logged and displayed with the provided name
+	 *
+	 * <p>The program execution will be logged and displayed with the provided name
 	 *
 	 * @param jobName
 	 * 		Desired name of the job
@@ -1550,12 +1545,10 @@ public abstract class StreamExecutionEnvironment {
 	 * Adds an operator to the list of operators that should be executed when calling
 	 * {@link #execute}.
 	 *
-	 * <p>
-	 * When calling {@link #execute()} only the operators that where previously added to the list
+	 * <p>When calling {@link #execute()} only the operators that where previously added to the list
 	 * are executed.
 	 *
-	 * <p>
-	 * This is not meant to be used by users. The API methods that create operators must call
+	 * <p>This is not meant to be used by users. The API methods that create operators must call
 	 * this method.
 	 */
 	@Internal

http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
index b1521f5..50f2ed3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
@@ -26,6 +26,10 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 
+/**
+ * A special {@link StreamExecutionEnvironment} that is used in the web frontend when generating
+ * a user-inspectable graph of a streaming job.
+ */
 @PublicEvolving
 public class StreamPlanEnvironment extends StreamExecutionEnvironment {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java
index d13e461..4b6d52a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractor.java
@@ -25,7 +25,7 @@ import org.apache.flink.annotation.PublicEvolving;
  * ascending. In this case, the local watermarks for the streams are easy to generate, because
  * they strictly follow the timestamps.
  *
- * <b>Note:</b> This is just a deprecated stub class. The actual code for this has moved to
+ * <p><b>Note:</b> This is just a deprecated stub class. The actual code for this has moved to
  * {@link org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor}.
  *
  * @param <T> The type of the elements that this function can extract timestamps from

http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimestampExtractor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimestampExtractor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimestampExtractor.java
index b220d07..2dc9091 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimestampExtractor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimestampExtractor.java
@@ -24,14 +24,12 @@ import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExt
 /**
  * Interface for user functions that extract timestamps from elements.
  *
- * <p>
- * The extractor must also keep track of the current watermark. The system will periodically
- * retrieve this watermark using {@link #getCurrentWatermark()} and submit it throughout the topology.
+ * <p>The extractor must also keep track of the current watermark. The system will periodically
+ * retrieve this watermark using {@link #getCurrentWatermark()} and submit it throughout the
+ * topology.
  *
- * <p>
- * Note: If you know that timestamps are monotonically increasing you can use
- * {@link AscendingTimestampExtractor}. This will
- * keep track of watermarks.
+ * <p>Note: If you know that timestamps are monotonically increasing you can use
+ * {@link AscendingTimestampExtractor}. This will keep track of watermarks.
  *
  * @param <T> The type of the elements that this function can extract timestamps from
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java
index 57c48a6..e6ae93f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/AggregationFunction.java
@@ -20,12 +20,17 @@ package org.apache.flink.streaming.api.functions.aggregation;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.ReduceFunction;
 
+/**
+ * Holder class for aggregation types that can be used on a windowed stream or keyed stream.
+ */
 @Internal
 public abstract class AggregationFunction<T> implements ReduceFunction<T> {
 	private static final long serialVersionUID = 1L;
 
+	/**
+	 * Aggregation types that can be used on a windowed stream or keyed stream.
+	 */
 	public enum AggregationType {
 		SUM, MIN, MAX, MINBY, MAXBY,
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
index c634434..3216e3a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
@@ -23,6 +23,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.util.typeutils.FieldAccessor;
 import org.apache.flink.streaming.util.typeutils.FieldAccessorFactory;
 
+/**
+ * An {@link AggregationFunction} that computes values based on comparisons of
+ * {@link Comparable Comparables}.
+ */
 @Internal
 public class ComparableAggregator<T> extends AggregationFunction<T> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/Comparator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/Comparator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/Comparator.java
index 14e1764..9202a49 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/Comparator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/Comparator.java
@@ -22,6 +22,9 @@ import java.io.Serializable;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType;
 
+/**
+ * Internal comparator implementation, for use with {@link ComparableAggregator}.
+ */
 @Internal
 public abstract class Comparator implements Serializable {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
index 5e1378e..58b5981 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
@@ -26,6 +26,9 @@ import org.apache.flink.streaming.util.typeutils.FieldAccessorFactory;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.streaming.util.typeutils.FieldAccessor;
 
+/**
+ * An {@link AggregationFunction} that sums up fields.
+ */
 @Internal
 public class SumAggregator<T> extends AggregationFunction<T> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumFunction.java
index 124b726..a177c58 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumFunction.java
@@ -21,8 +21,11 @@ import org.apache.flink.annotation.Internal;
 
 import java.io.Serializable;
 
+/**
+ * Internal function for summing up contents of fields. This is used with {@link SumAggregator}.
+ */
 @Internal
-public abstract class SumFunction implements Serializable{
+public abstract class SumFunction implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 
@@ -48,7 +51,7 @@ public abstract class SumFunction implements Serializable{
 		}
 	}
 
-	public static class IntSum extends SumFunction {
+	static class IntSum extends SumFunction {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -57,7 +60,7 @@ public abstract class SumFunction implements Serializable{
 		}
 	}
 
-	public static class LongSum extends SumFunction {
+	static class LongSum extends SumFunction {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -66,7 +69,7 @@ public abstract class SumFunction implements Serializable{
 		}
 	}
 
-	public static class DoubleSum extends SumFunction {
+	static class DoubleSum extends SumFunction {
 
 		private static final long serialVersionUID = 1L;
 
@@ -76,7 +79,7 @@ public abstract class SumFunction implements Serializable{
 		}
 	}
 
-	public static class ShortSum extends SumFunction {
+	static class ShortSum extends SumFunction {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -85,7 +88,7 @@ public abstract class SumFunction implements Serializable{
 		}
 	}
 
-	public static class FloatSum extends SumFunction {
+	static class FloatSum extends SumFunction {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -94,7 +97,7 @@ public abstract class SumFunction implements Serializable{
 		}
 	}
 
-	public static class ByteSum extends SumFunction {
+	static class ByteSum extends SumFunction {
 		private static final long serialVersionUID = 1L;
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
index 8c788b4..552f009 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
@@ -57,11 +57,10 @@ import java.util.Map;
  * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
  * {@link RichFunction#close()}.
  *
- * <p>
- * State related apis in {@link RuntimeContext} are not supported yet because the key may get
+ * <p>State related apis in {@link RuntimeContext} are not supported yet because the key may get
  * changed while accessing states in the working thread.
- * <p>
- * {@link IterationRuntimeContext#getIterationAggregator(String)} is not supported since the
+ *
+ * <p>{@link IterationRuntimeContext#getIterationAggregator(String)} is not supported since the
  * aggregator may be modified by multiple threads.
  *
  * @param <IN> The type of the input elements.

http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java
index 25078ae..964c13a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/collector/AsyncCollector.java
@@ -31,19 +31,18 @@ import java.util.Collection;
 public interface AsyncCollector<OUT> {
 	/**
 	 * Set result.
-	 * <p>
-	 * Note that it should be called for exactly one time in the user code.
+	 *
+	 * <p>Note that it should be called for exactly one time in the user code.
 	 * Calling this function for multiple times will cause data lose.
-	 * <p>
-	 * Put all results in a {@link Collection} and then issue
-	 * {@link AsyncCollector#collect(Collection)}.
+	 *
+	 * <p>Put all results in a {@link Collection} and then emit output.
 	 *
 	 * @param result A list of results.
 	 */
 	void collect(Collection<OUT> result);
 
 	/**
-	 * Set error
+	 * Set error.
 	 *
 	 * @param error A Throwable object.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
index 8811e32..4285cc3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
@@ -51,7 +51,7 @@ public abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunct
 	/**
 	 * This method is called for each element in the first of the connected streams.
 	 *
-	 * This function can output zero or more elements using the {@link Collector} parameter
+	 * <p>This function can output zero or more elements using the {@link Collector} parameter
 	 * and also update internal state or set timers using the {@link Context} parameter.
 	 * 
 	 * @param value The stream element
@@ -68,7 +68,7 @@ public abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunct
 	/**
 	 * This method is called for each element in the second of the connected streams.
 	 *
-	 * This function can output zero or more elements using the {@link Collector} parameter
+	 * <p>This function can output zero or more elements using the {@link Collector} parameter
 	 * and also update internal state or set timers using the {@link Context} parameter.
 	 * 
 	 * @param value The stream element

http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java
index 643c06c..64c38b9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java
@@ -20,6 +20,9 @@ package org.apache.flink.streaming.api.functions.sink;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 
+/**
+ * A {@link org.apache.flink.api.common.functions.RichFunction} version of {@link SinkFunction}.
+ */
 @Public
 public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
index 807735d..0d0669a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
@@ -34,11 +34,11 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Socket client that acts as a streaming sink. The data is sent to a Socket as a byte array.
- * <p>
- * The sink can be set to retry message sends after the sending failed.
- * <p>
- * The sink can be set to 'autoflush', in which case the socket stream is flushed after every message. This
- * significantly reduced throughput, but also decreases message latency.
+ *
+ * <p>The sink can be set to retry message sends after the sending failed.
+ *
+ * <p>The sink can be set to 'autoflush', in which case the socket stream is flushed after every
+ * message. This significantly reduced throughput, but also decreases message latency.
  *
  * @param <IN> data to be written into the Socket.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormat.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormat.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormat.java
index d445be0..65915af 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormat.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/WriteFormat.java
@@ -35,7 +35,7 @@ public abstract class WriteFormat<IN> implements Serializable {
 
 	/**
 	 * Writes the contents of tupleList to the file specified by path.
-	 * 
+	 *
 	 * @param path
 	 *            is the path to the location where the tuples are written
 	 * @param tupleList

http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
index 8b82d84..8694135 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -56,10 +56,10 @@ import java.util.TreeMap;
  *     <li>Assigning them to downstream tasks for further processing.</li>
  * </ol>
  *
- * The splits to be read are forwarded to the downstream {@link ContinuousFileReaderOperator}
+ * <p>The splits to be read are forwarded to the downstream {@link ContinuousFileReaderOperator}
  * which can have parallelism greater than one.
  *
- * <b>IMPORTANT NOTE: </b> Splits are forwarded downstream for reading in ascending modification time order,
+ * <p><b>IMPORTANT NOTE: </b> Splits are forwarded downstream for reading in ascending modification time order,
  * based on the modification time of the files they belong to.
  */
 @Internal
@@ -72,6 +72,7 @@ public class ContinuousFileMonitoringFunction<OUT>
 
 	/**
 	 * The minimum interval allowed between consecutive path scans.
+	 *
 	 * <p><b>NOTE:</b> Only applicable to the {@code PROCESS_CONTINUOUSLY} mode.
 	 */
 	public static final long MIN_MONITORING_INTERVAL = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
index ff941f1..8cee0bc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
@@ -33,6 +33,9 @@ import java.util.List;
 import java.util.Map;
 
 /**
+ * A {@link SourceFunction} that monitors a directory and sends events downstream when it detects
+ * new files. Used together with {@link FileReadFunction}.
+ *
  * @deprecated Internal class deprecated in favour of {@link ContinuousFileMonitoringFunction}.
  */
 @Internal
@@ -42,6 +45,9 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Lon
 
 	private static final Logger LOG = LoggerFactory.getLogger(FileMonitoringFunction.class);
 
+	/**
+	 * The watch type of the {@code FileMonitoringFunction}.
+	 */
 	public enum WatchType {
 		ONLY_NEW_FILES, // Only new files will be processed.
 		REPROCESS_WITH_APPENDED, // When some files are appended, all contents

http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
index ff8a295..172f7a3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
@@ -30,6 +30,9 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.Collector;
 
 /**
+ * This is used together with {@link FileMonitoringFunction} to read from files that the
+ * monitoring functions discovers.
+ *
  * @deprecated Internal class deprecated in favour of {@link ContinuousFileMonitoringFunction}.
  */
 @Internal

http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
index e294cae..68a1750 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
@@ -45,8 +45,7 @@ import java.util.List;
  * That way, any object transport using Java serialization will not be affected by the serializability
  * of the elements.</p>
  *
- * <p>
- * <b>NOTE:</b> This source has a parallelism of 1.
+ * <p><b>NOTE:</b> This source has a parallelism of 1.
  *
  * @param <T> The type of elements returned by this function.
  */
@@ -55,22 +54,22 @@ public class FromElementsFunction<T> implements SourceFunction<T>, CheckpointedF
 	
 	private static final long serialVersionUID = 1L;
 
-	/** The (de)serializer to be used for the data elements */
+	/** The (de)serializer to be used for the data elements. */
 	private final TypeSerializer<T> serializer;
 	
-	/** The actual data elements, in serialized form */
+	/** The actual data elements, in serialized form. */
 	private final byte[] elementsSerialized;
 	
-	/** The number of serialized elements */
+	/** The number of serialized elements. */
 	private final int numElements;
 
-	/** The number of elements emitted already */
+	/** The number of elements emitted already. */
 	private volatile int numElementsEmitted;
 
-	/** The number of elements to skip initially */
+	/** The number of elements to skip initially. */
 	private volatile int numElementsToSkip;
 	
-	/** Flag to make the source cancelable */
+	/** Flag to make the source cancelable. */
 	private volatile boolean isRunning = true;
 
 	private transient ListState<Integer> checkpointedState;

http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
index e721bf3..a8b527f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromIteratorFunction.java
@@ -21,6 +21,9 @@ import org.apache.flink.annotation.PublicEvolving;
 
 import java.util.Iterator;
 
+/**
+ * A {@link SourceFunction} that reads elements from an {@link Iterator} and emits them.
+ */
 @PublicEvolving
 public class FromIteratorFunction<T> implements SourceFunction<T> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
index dc97b1a..db6c8a2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FromSplittableIteratorFunction.java
@@ -23,6 +23,9 @@ import org.apache.flink.util.SplittableIterator;
 
 import java.util.Iterator;
 
+/**
+ * A {@link SourceFunction} that reads elements from an {@link SplittableIterator} and emits them.
+ */
 @PublicEvolving
 public class FromSplittableIteratorFunction<T> extends RichParallelSourceFunction<T> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
index c067ca7..9592c88 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java
@@ -32,6 +32,9 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
+/**
+ * A {@link SourceFunction} that reads data using an {@link InputFormat}.
+ */
 @Internal
 public class InputFormatSourceFunction<OUT> extends RichParallelSourceFunction<OUT> {
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
index e835070..3da1ec3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
@@ -44,27 +44,27 @@ import org.slf4j.LoggerFactory;
 /**
  * Abstract base class for data sources that receive elements from a message queue and
  * acknowledge them back by IDs.
- * <p>
- * The mechanism for this source assumes that messages are identified by a unique ID.
+ *
+ * <p>The mechanism for this source assumes that messages are identified by a unique ID.
  * When messages are taken from the message queue, the message must not be dropped immediately,
  * but must be retained until acknowledged. Messages that are not acknowledged within a certain
  * time interval will be served again (to a different connection, established by the recovered source).
- * <p>
- * Note that this source can give no guarantees about message order in the case of failures,
+ *
+ * <p>Note that this source can give no guarantees about message order in the case of failures,
  * because messages that were retrieved but not yet acknowledged will be returned later again, after
  * a set of messages that was not retrieved before the failure.
- * <p>
- * Internally, this source gathers the IDs of elements it emits. Per checkpoint, the IDs are stored and
+ *
+ * <p>Internally, this source gathers the IDs of elements it emits. Per checkpoint, the IDs are stored and
  * acknowledged when the checkpoint is complete. That way, no message is acknowledged unless it is certain
  * that it has been successfully processed throughout the topology and the updates to any state caused by
  * that message are persistent.
- * <p>
- * All messages that are emitted and successfully processed by the streaming program will eventually be
+ *
+ * <p>All messages that are emitted and successfully processed by the streaming program will eventually be
  * acknowledged. In corner cases, the source may receive certain IDs multiple times, if a
  * failure occurs while acknowledging. To cope with this situation, an additional Set stores all
  * processed IDs. IDs are only removed after they have been acknowledged.
- * <p>
- * A typical way to use this base in a source function is by implementing a run() method as follows:
+ *
+ * <p>A typical way to use this base in a source function is by implementing a run() method as follows:
  * <pre>{@code
  * public void run(SourceContext<Type> ctx) throws Exception {
  *     while (running) {
@@ -91,13 +91,16 @@ public abstract class MessageAcknowledgingSourceBase<Type, UId>
 
 	private static final Logger LOG = LoggerFactory.getLogger(MessageAcknowledgingSourceBase.class);
 
-	/** Serializer used to serialize the IDs for checkpoints */
+	/** Serializer used to serialize the IDs for checkpoints. */
 	private final TypeSerializer<UId> idSerializer;
 
-	/** The list gathering the IDs of messages emitted during the current checkpoint */
+	/** The list gathering the IDs of messages emitted during the current checkpoint. */
 	private transient List<UId> idsForCurrentCheckpoint;
 
-	/** The list with IDs from checkpoints that were triggered, but not yet completed or notified of completion */
+	/**
+	 * The list with IDs from checkpoints that were triggered, but not yet completed or notified of
+	 * completion.
+	 */
 	protected transient ArrayDeque<Tuple2<Long, List<UId>>> pendingCheckpoints;
 
 	/**
@@ -178,6 +181,7 @@ public abstract class MessageAcknowledgingSourceBase<Type, UId>
 
 	/**
 	 * This method must be implemented to acknowledge the given set of IDs back to the message queue.
+	 *
 	 * @param uIds The list od IDs to acknowledge.
 	 */
 	protected abstract void acknowledgeIDs(long checkpointId, List<UId> uIds);

http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java
index 188fdce..965dc02 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java
@@ -37,10 +37,12 @@ import java.util.List;
  * acknowledge them back by IDs. In contrast to {@link MessageAcknowledgingSourceBase}, this source
  * handles two types of ids:
  *
- * 1) Session ids
- * 2) Unique message ids
+ * <ol>
+ *   <li>Session ids
+ *   <li>Unique message ids
+ * </ol>
  *
- * Session ids are used to acknowledge messages in a session. When a checkpoint is restored,
+ * <p>Session ids are used to acknowledge messages in a session. When a checkpoint is restored,
  * unacknowledged messages are redelivered. Duplicates are detected using the unique message ids
  * which are checkpointed.
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/2581a7b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
index d1fea1e..8d77b15 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
@@ -32,14 +32,15 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A source function that reads strings from a socket. The source will read bytes from the socket stream
- * and convert them to characters, each byte individually. When the delimiter character is received,
- * the function will output the current string, and begin a new string.
- * <p>
- * The function strips trailing <i>carriage return</i> characters (\r) when the delimiter is the
+ * A source function that reads strings from a socket. The source will read bytes from the socket
+ * stream and convert them to characters, each byte individually. When the delimiter character is
+ * received, the function will output the current string, and begin a new string.
+ *
+ * <p>The function strips trailing <i>carriage return</i> characters (\r) when the delimiter is the
  * newline character (\n).
- * <p>
- * The function can be set to reconnect to the server socket in case that the stream is closed on the server side.
+ *
+ * <p>The function can be set to reconnect to the server socket in case that the stream is closed on
+ * the server side.
  */
 @PublicEvolving
 public class SocketTextStreamFunction implements SourceFunction<String> {
@@ -48,10 +49,10 @@ public class SocketTextStreamFunction implements SourceFunction<String> {
 	
 	private static final Logger LOG = LoggerFactory.getLogger(SocketTextStreamFunction.class);
 
-	/** Default delay between successive connection attempts */
+	/** Default delay between successive connection attempts. */
 	private static final int DEFAULT_CONNECTION_RETRY_SLEEP = 500;
 
-	/** Default connection timeout when connecting to the server socket (infinite) */
+	/** Default connection timeout when connecting to the server socket (infinite). */
 	private static final int CONNECTION_TIMEOUT_TIME = 0;
 	
 	


Mime
View raw message