flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [02/22] flink git commit: [FLINK-6731] [tests] Activate strict checkstyle for flink-tests
Date Wed, 12 Jul 2017 23:44:02 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
index 765eae5..f73bf42 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
@@ -20,31 +20,33 @@ package org.apache.flink.test.streaming.runtime;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.OutputTag;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import javax.annotation.Nullable;
+
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -400,7 +402,6 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implemen
 		assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult());
 	}
 
-
 	/**
 	 * Test ProcessFunction side outputs with wrong {@code OutputTag}.
 	 */
@@ -450,7 +451,7 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implemen
 
 	private static class TestKeySelector implements KeySelector<Integer, Integer> {
 		private static final long serialVersionUID = 1L;
-		
+
 		@Override
 		public Integer getKey(Integer value) throws Exception {
 			return value;
@@ -458,7 +459,7 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implemen
 	}
 
 	/**
-	 * Test window late arriving events stream
+	 * Test window late arriving events stream.
 	 */
 	@Test
 	public void testAllWindowLateArrivingEvents() throws Exception {
@@ -478,10 +479,10 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implemen
 				.sideOutputLateData(lateDataTag)
 				.apply(new AllWindowFunction<Integer, Integer, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
-					
+
 					@Override
 					public void apply(TimeWindow window, Iterable<Integer> values, Collector<Integer> out) throws Exception {
-							for(Integer val : values) {
+							for (Integer val : values) {
 								out.collect(val);
 							}
 					}
@@ -528,7 +529,7 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implemen
 
 					@Override
 					public void apply(Integer key, TimeWindow window, Iterable<Integer> input, Collector<String> out) throws Exception {
-						for(Integer val : input) {
+						for (Integer val : input) {
 							out.collect(String.valueOf(key) + "-" + String.valueOf(val));
 						}
 					}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
index ce342c0..f5c769d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+
 import org.junit.Test;
 
 import java.io.IOException;
@@ -42,6 +43,9 @@ import java.io.IOException;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Integration tests for {@link OperatorStateBackend}.
+ */
 public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
 
 	/**
@@ -58,10 +62,9 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
 		see.getConfig().setRestartStrategy(RestartStrategies.noRestart());
 		see.setStateBackend(new FailingStateBackend());
 
-
 		see.fromElements(new Tuple2<>("Hello", 1))
 			.keyBy(0)
-			.map(new RichMapFunction<Tuple2<String,Integer>, String>() {
+			.map(new RichMapFunction<Tuple2<String, Integer>, String>() {
 				private static final long serialVersionUID = 1L;
 
 				@Override
@@ -87,7 +90,7 @@ public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
 		}
 	}
 
-	public static class FailingStateBackend extends AbstractStateBackend {
+	private static class FailingStateBackend extends AbstractStateBackend {
 		private static final long serialVersionUID = 1L;
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
index 48e6fae..cb31f2b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
@@ -29,8 +29,8 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.TimerException;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 
@@ -46,27 +46,25 @@ import java.util.concurrent.Semaphore;
 /**
  * Tests for the timer service of {@code StreamTask}.
  *
- * <p>
- * These tests ensure that exceptions are properly forwarded from the timer thread to
+ * <p>These tests ensure that exceptions are properly forwarded from the timer thread to
  * the task thread and that operator methods are not invoked concurrently.
  */
 @RunWith(Parameterized.class)
 public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 
 	private final TimeCharacteristic timeCharacteristic;
-	
+
 	public StreamTaskTimerITCase(TimeCharacteristic characteristic) {
 		timeCharacteristic = characteristic;
 	}
 
-
 	/**
 	 * Note: this test fails if we don't check for exceptions in the source contexts and do not
 	 * synchronize in the source contexts.
 	 */
 	@Test
 	public void testOperatorChainedToSource() throws Exception {
-		
+
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(timeCharacteristic);
 		env.setParallelism(1);
@@ -134,7 +132,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 		}
 		Assert.assertTrue(testSuccess);
 	}
-	
+
 	@Test
 	public void testTwoInputOperatorWithoutChaining() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -171,7 +169,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 		Assert.assertTrue(testSuccess);
 	}
 
-	public static class TimerOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String>, ProcessingTimeCallback {
+	private static class TimerOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String>, ProcessingTimeCallback {
 		private static final long serialVersionUID = 1L;
 
 		int numTimers = 0;
@@ -196,7 +194,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 				first = false;
 			}
 			numElements++;
-			
+
 			semaphore.release();
 		}
 
@@ -230,7 +228,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 		}
 	}
 
-	public static class TwoInputTimerOperator extends AbstractStreamOperator<String> implements TwoInputStreamOperator<String, String, String>, ProcessingTimeCallback {
+	private static class TwoInputTimerOperator extends AbstractStreamOperator<String> implements TwoInputStreamOperator<String, String, String>, ProcessingTimeCallback {
 		private static final long serialVersionUID = 1L;
 
 		int numTimers = 0;
@@ -274,7 +272,6 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 			semaphore.release();
 		}
 
-
 		@Override
 		public void onProcessingTime(long time) throws Exception {
 			if (!semaphore.tryAcquire()) {
@@ -307,7 +304,6 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 		}
 	}
 
-
 	private static class InfiniteTestSource implements SourceFunction<String> {
 		private static final long serialVersionUID = 1L;
 		private volatile boolean running = true;
@@ -324,7 +320,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 			running = false;
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  parametrization
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index 45686ef..9b1dd2a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -32,12 +32,12 @@ import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -75,7 +75,6 @@ public class TimestampITCase extends TestLogger {
 	// this is used in some tests to synchronize
 	static MultiShotLatch latch;
 
-
 	private static LocalFlinkMiniCluster cluster;
 
 	@Before
@@ -84,7 +83,6 @@ public class TimestampITCase extends TestLogger {
 		latch = new MultiShotLatch();
 	}
 
-
 	@BeforeClass
 	public static void startCluster() {
 		Configuration config = new Configuration();
@@ -111,28 +109,26 @@ public class TimestampITCase extends TestLogger {
 	 * These check whether custom timestamp emission works at sources and also whether timestamps
 	 * arrive at operators throughout a topology.
 	 *
-	 * <p>
-	 * This also checks whether watermarks keep propagating if a source closes early.
+	 * <p>This also checks whether watermarks keep propagating if a source closes early.
 	 *
-	 * <p>
-	 * This only uses map to test the workings of watermarks in a complete, running topology. All
+	 * <p>This only uses map to test the workings of watermarks in a complete, running topology. All
 	 * tasks and stream operators have dedicated tests that test the watermark propagation
 	 * behaviour.
 	 */
 	@Test
 	public void testWatermarkPropagation() throws Exception {
-		final int NUM_WATERMARKS = 10;
+		final int numWatermarks = 10;
 
 		long initialTime = 0L;
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		
+
 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 		env.setParallelism(PARALLELISM);
 		env.getConfig().disableSysoutLogging();
 
-		DataStream<Integer> source1 = env.addSource(new MyTimestampSource(initialTime, NUM_WATERMARKS));
-		DataStream<Integer> source2 = env.addSource(new MyTimestampSource(initialTime, NUM_WATERMARKS / 2));
+		DataStream<Integer> source1 = env.addSource(new MyTimestampSource(initialTime, numWatermarks));
+		DataStream<Integer> source2 = env.addSource(new MyTimestampSource(initialTime, numWatermarks / 2));
 
 		source1.union(source2)
 				.map(new IdentityMap())
@@ -146,31 +142,31 @@ public class TimestampITCase extends TestLogger {
 		for (int i = 0; i < PARALLELISM; i++) {
 			// we are only guaranteed to see NUM_WATERMARKS / 2 watermarks because the
 			// other source stops emitting after that
-			for (int j = 0; j < NUM_WATERMARKS / 2; j++) {
+			for (int j = 0; j < numWatermarks / 2; j++) {
 				if (!CustomOperator.finalWatermarks[i].get(j).equals(new Watermark(initialTime + j))) {
 					System.err.println("All Watermarks: ");
-					for (int k = 0; k <= NUM_WATERMARKS / 2; k++) {
+					for (int k = 0; k <= numWatermarks / 2; k++) {
 						System.err.println(CustomOperator.finalWatermarks[i].get(k));
 					}
 
 					fail("Wrong watermark.");
 				}
 			}
-			
+
 			assertEquals(Watermark.MAX_WATERMARK,
-					CustomOperator.finalWatermarks[i].get(CustomOperator.finalWatermarks[i].size()-1));
+					CustomOperator.finalWatermarks[i].get(CustomOperator.finalWatermarks[i].size() - 1));
 		}
 	}
 
 	@Test
 	public void testWatermarkPropagationNoFinalWatermarkOnStop() throws Exception {
-		
+
 		// for this test to work, we need to be sure that no other jobs are being executed
 		while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) {
 			Thread.sleep(100);
 		}
-		
-		final int NUM_WATERMARKS = 10;
+
+		final int numWatermarks = 10;
 
 		long initialTime = 0L;
 
@@ -180,8 +176,8 @@ public class TimestampITCase extends TestLogger {
 		env.setParallelism(PARALLELISM);
 		env.getConfig().disableSysoutLogging();
 
-		DataStream<Integer> source1 = env.addSource(new MyTimestampSourceInfinite(initialTime, NUM_WATERMARKS));
-		DataStream<Integer> source2 = env.addSource(new MyTimestampSourceInfinite(initialTime, NUM_WATERMARKS / 2));
+		DataStream<Integer> source1 = env.addSource(new MyTimestampSourceInfinite(initialTime, numWatermarks));
+		DataStream<Integer> source2 = env.addSource(new MyTimestampSourceInfinite(initialTime, numWatermarks / 2));
 
 		source1.union(source2)
 				.map(new IdentityMap())
@@ -200,7 +196,7 @@ public class TimestampITCase extends TestLogger {
 					}
 
 					JobID id = running.get(0);
-					
+
 					// send stop until the job is stopped
 					do {
 						try {
@@ -210,7 +206,7 @@ public class TimestampITCase extends TestLogger {
 							if (e.getCause() instanceof IllegalStateException) {
 								// this means the job is not yet ready to be stopped,
 								// for example because it is still in CREATED state
-								// we ignore the exception 
+								// we ignore the exception
 							} else {
 								// other problem
 								throw e;
@@ -225,29 +221,29 @@ public class TimestampITCase extends TestLogger {
 				}
 			}
 		}.start();
-		
+
 		env.execute();
 
 		// verify that all the watermarks arrived at the final custom operator
 		for (List<Watermark> subtaskWatermarks : CustomOperator.finalWatermarks) {
-			
+
 			// we are only guaranteed to see NUM_WATERMARKS / 2 watermarks because the
 			// other source stops emitting after that
 			for (int j = 0; j < subtaskWatermarks.size(); j++) {
 				if (subtaskWatermarks.get(j).getTimestamp() != initialTime + j) {
 					System.err.println("All Watermarks: ");
-					for (int k = 0; k <= NUM_WATERMARKS / 2; k++) {
+					for (int k = 0; k <= numWatermarks / 2; k++) {
 						System.err.println(subtaskWatermarks.get(k));
 					}
 
 					fail("Wrong watermark.");
 				}
 			}
-			
+
 			// if there are watermarks, the final one must not be the MAX watermark
 			if (subtaskWatermarks.size() > 0) {
 				assertNotEquals(Watermark.MAX_WATERMARK,
-						subtaskWatermarks.get(subtaskWatermarks.size()-1));
+						subtaskWatermarks.get(subtaskWatermarks.size() - 1));
 			}
 		}
 	}
@@ -258,8 +254,7 @@ public class TimestampITCase extends TestLogger {
 	 */
 	@Test
 	public void testTimestampHandling() throws Exception {
-		final int NUM_ELEMENTS = 10;
-
+		final int numElements = 10;
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
@@ -267,8 +262,8 @@ public class TimestampITCase extends TestLogger {
 		env.setParallelism(PARALLELISM);
 		env.getConfig().disableSysoutLogging();
 
-		DataStream<Integer> source1 = env.addSource(new MyTimestampSource(0L, NUM_ELEMENTS));
-		DataStream<Integer> source2 = env.addSource(new MyTimestampSource(0L, NUM_ELEMENTS));
+		DataStream<Integer> source1 = env.addSource(new MyTimestampSource(0L, numElements));
+		DataStream<Integer> source2 = env.addSource(new MyTimestampSource(0L, numElements));
 
 		source1
 				.map(new IdentityMap())
@@ -276,7 +271,6 @@ public class TimestampITCase extends TestLogger {
 				.transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator())
 				.addSink(new DiscardingSink<Integer>());
 
-
 		env.execute();
 	}
 
@@ -285,23 +279,23 @@ public class TimestampITCase extends TestLogger {
 	 */
 	@Test
 	public void testDisabledTimestamps() throws Exception {
-		final int NUM_ELEMENTS = 10;
-		
+		final int numElements = 10;
+
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		
+
 		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 		env.setParallelism(PARALLELISM);
 		env.getConfig().disableSysoutLogging();
-		
-		DataStream<Integer> source1 = env.addSource(new MyNonWatermarkingSource(NUM_ELEMENTS));
-		DataStream<Integer> source2 = env.addSource(new MyNonWatermarkingSource(NUM_ELEMENTS));
+
+		DataStream<Integer> source1 = env.addSource(new MyNonWatermarkingSource(numElements));
+		DataStream<Integer> source2 = env.addSource(new MyNonWatermarkingSource(numElements));
 
 		source1
 				.map(new IdentityMap())
 				.connect(source2).map(new IdentityCoMap())
 				.transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new DisabledTimestampCheckingOperator())
 				.addSink(new DiscardingSink<Integer>());
-		
+
 		env.execute();
 	}
 
@@ -312,7 +306,7 @@ public class TimestampITCase extends TestLogger {
 	 */
 	@Test
 	public void testTimestampExtractorWithAutoInterval() throws Exception {
-		final int NUM_ELEMENTS = 10;
+		final int numElements = 10;
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
@@ -321,12 +315,11 @@ public class TimestampITCase extends TestLogger {
 		env.setParallelism(1);
 		env.getConfig().disableSysoutLogging();
 
-
 		DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() {
 			@Override
 			public void run(SourceContext<Integer> ctx) throws Exception {
 				int index = 1;
-				while (index <= NUM_ELEMENTS) {
+				while (index <= numElements) {
 					ctx.collect(index);
 					latch.await();
 					index++;
@@ -357,15 +350,15 @@ public class TimestampITCase extends TestLogger {
 		env.execute();
 
 		// verify that we get NUM_ELEMENTS watermarks
-		for (int j = 0; j < NUM_ELEMENTS; j++) {
+		for (int j = 0; j < numElements; j++) {
 			if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j))) {
 				long wm = CustomOperator.finalWatermarks[0].get(j).getTimestamp();
 				Assert.fail("Wrong watermark. Expected: " + j + " Found: " + wm + " All: " + CustomOperator.finalWatermarks[0]);
 			}
 		}
-		
+
 		// the input is finite, so it should have a MAX Watermark
-		assertEquals(Watermark.MAX_WATERMARK, 
+		assertEquals(Watermark.MAX_WATERMARK,
 				CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size() - 1));
 	}
 
@@ -376,7 +369,7 @@ public class TimestampITCase extends TestLogger {
 	 */
 	@Test
 	public void testTimestampExtractorWithCustomWatermarkEmit() throws Exception {
-		final int NUM_ELEMENTS = 10;
+		final int numElements = 10;
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
@@ -385,12 +378,11 @@ public class TimestampITCase extends TestLogger {
 		env.setParallelism(1);
 		env.getConfig().disableSysoutLogging();
 
-
 		DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() {
 			@Override
 			public void run(SourceContext<Integer> ctx) throws Exception {
 				int index = 1;
-				while (index <= NUM_ELEMENTS) {
+				while (index <= numElements) {
 					ctx.collect(index);
 					latch.await();
 					index++;
@@ -403,7 +395,7 @@ public class TimestampITCase extends TestLogger {
 
 		source1
 				.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Integer>() {
-					
+
 					@Override
 					public long extractTimestamp(Integer element, long currentTimestamp) {
 						return element;
@@ -417,11 +409,10 @@ public class TimestampITCase extends TestLogger {
 				.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
 				.transform("Timestamp Check", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator());
 
-
 		env.execute();
 
 		// verify that we get NUM_ELEMENTS watermarks
-		for (int j = 0; j < NUM_ELEMENTS; j++) {
+		for (int j = 0; j < numElements; j++) {
 			if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j))) {
 				Assert.fail("Wrong watermark.");
 			}
@@ -433,12 +424,11 @@ public class TimestampITCase extends TestLogger {
 	}
 
 	/**
-	 * This test verifies that the timestamp extractor does not emit decreasing watermarks even
-	 *
+	 * This test verifies that the timestamp extractor does not emit decreasing watermarks.
 	 */
 	@Test
 	public void testTimestampExtractorWithDecreasingCustomWatermarkEmit() throws Exception {
-		final int NUM_ELEMENTS = 10;
+		final int numElements = 10;
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
@@ -447,12 +437,11 @@ public class TimestampITCase extends TestLogger {
 		env.setParallelism(1);
 		env.getConfig().disableSysoutLogging();
 
-
 		DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() {
 			@Override
 			public void run(SourceContext<Integer> ctx) throws Exception {
 				int index = 1;
-				while (index <= NUM_ELEMENTS) {
+				while (index <= numElements) {
 					ctx.collect(index);
 					Thread.sleep(100);
 					ctx.collect(index - 1);
@@ -481,11 +470,10 @@ public class TimestampITCase extends TestLogger {
 				.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
 				.transform("Timestamp Check", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator());
 
-
 		env.execute();
 
 		// verify that we get NUM_ELEMENTS watermarks
-		for (int j = 0; j < NUM_ELEMENTS; j++) {
+		for (int j = 0; j < numElements; j++) {
 			if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j))) {
 				Assert.fail("Wrong watermark.");
 			}
@@ -500,7 +488,7 @@ public class TimestampITCase extends TestLogger {
 	 */
 	@Test
 	public void testTimestampExtractorWithLongMaxWatermarkFromSource() throws Exception {
-		final int NUM_ELEMENTS = 10;
+		final int numElements = 10;
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
@@ -509,16 +497,15 @@ public class TimestampITCase extends TestLogger {
 		env.setParallelism(2);
 		env.getConfig().disableSysoutLogging();
 
-
 		DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() {
 			@Override
 			public void run(SourceContext<Integer> ctx) throws Exception {
 				int index = 1;
-				while (index <= NUM_ELEMENTS) {
+				while (index <= numElements) {
 					ctx.collectWithTimestamp(index, index);
 					ctx.collectWithTimestamp(index - 1, index - 1);
 					index++;
-					ctx.emitWatermark(new Watermark(index-2));
+					ctx.emitWatermark(new Watermark(index - 2));
 				}
 
 				// emit the final Long.MAX_VALUE watermark, do it twice and verify that
@@ -546,7 +533,6 @@ public class TimestampITCase extends TestLogger {
 				})
 			.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true));
 
-
 		env.execute();
 
 		Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 1);
@@ -555,12 +541,12 @@ public class TimestampITCase extends TestLogger {
 
 	/**
 	 * This test verifies that the timestamp extractor forwards Long.MAX_VALUE watermarks.
-	 * 
-	 * Same test as before, but using a different timestamp extractor
+	 *
+	 * <p>Same test as before, but using a different timestamp extractor.
 	 */
 	@Test
 	public void testTimestampExtractorWithLongMaxWatermarkFromSource2() throws Exception {
-		final int NUM_ELEMENTS = 10;
+		final int numElements = 10;
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment
 				.getExecutionEnvironment();
@@ -574,11 +560,11 @@ public class TimestampITCase extends TestLogger {
 			@Override
 			public void run(SourceContext<Integer> ctx) throws Exception {
 				int index = 1;
-				while (index <= NUM_ELEMENTS) {
+				while (index <= numElements) {
 					ctx.collectWithTimestamp(index, index);
 					ctx.collectWithTimestamp(index - 1, index - 1);
 					index++;
-					ctx.emitWatermark(new Watermark(index-2));
+					ctx.emitWatermark(new Watermark(index - 2));
 				}
 
 				// emit the final Long.MAX_VALUE watermark, do it twice and verify that
@@ -605,7 +591,7 @@ public class TimestampITCase extends TestLogger {
 					}
 				})
 				.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true));
-		
+
 		env.execute();
 
 		Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 1);
@@ -618,9 +604,9 @@ public class TimestampITCase extends TestLogger {
 	 */
 	@Test
 	public void testEventTimeSourceWithProcessingTime() throws Exception {
-		StreamExecutionEnvironment env = 
+		StreamExecutionEnvironment env =
 				StreamExecutionEnvironment.getExecutionEnvironment();
-		
+
 		env.setParallelism(2);
 		env.getConfig().disableSysoutLogging();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
@@ -637,7 +623,7 @@ public class TimestampITCase extends TestLogger {
 		// other tests, so it normally emits watermarks
 		Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 0);
 	}
-	
+
 	@Test
 	public void testErrorOnEventTimeOverProcessingTime() {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -646,7 +632,7 @@ public class TimestampITCase extends TestLogger {
 		env.getConfig().disableSysoutLogging();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
-		DataStream<Tuple2<String, Integer>> source1 = 
+		DataStream<Tuple2<String, Integer>> source1 =
 				env.fromElements(new Tuple2<>("a", 1), new Tuple2<>("b", 2));
 
 		source1
@@ -701,9 +687,9 @@ public class TimestampITCase extends TestLogger {
 	// ------------------------------------------------------------------------
 	//  Custom Operators and Functions
 	// ------------------------------------------------------------------------
-	
+
 	@SuppressWarnings("unchecked")
-	public static class CustomOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> {
+	private static class CustomOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> {
 
 		List<Watermark> watermarks;
 		public static List<Watermark>[] finalWatermarks = new List[PARALLELISM];
@@ -749,7 +735,7 @@ public class TimestampITCase extends TestLogger {
 		}
 	}
 
-	public static class TimestampCheckingOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> {
+	private static class TimestampCheckingOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> {
 
 		public TimestampCheckingOperator() {
 			setChainingStrategy(ChainingStrategy.ALWAYS);
@@ -764,7 +750,7 @@ public class TimestampITCase extends TestLogger {
 		}
 	}
 
-	public static class DisabledTimestampCheckingOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> {
+	private static class DisabledTimestampCheckingOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> {
 
 		@Override
 		public void processElement(StreamRecord<Integer> element) throws Exception {
@@ -775,7 +761,7 @@ public class TimestampITCase extends TestLogger {
 		}
 	}
 
-	public static class IdentityCoMap implements CoMapFunction<Integer, Integer, Integer> {
+	private static class IdentityCoMap implements CoMapFunction<Integer, Integer, Integer> {
 		@Override
 		public Integer map1(Integer value) throws Exception {
 			return value;
@@ -787,14 +773,14 @@ public class TimestampITCase extends TestLogger {
 		}
 	}
 
-	public static class IdentityMap implements MapFunction<Integer, Integer> {
+	private static class IdentityMap implements MapFunction<Integer, Integer> {
 		@Override
 		public Integer map(Integer value) throws Exception {
 			return value;
 		}
 	}
 
-	public static class MyTimestampSource implements SourceFunction<Integer> {
+	private static class MyTimestampSource implements SourceFunction<Integer> {
 
 		private final long initialTime;
 		private final int numWatermarks;
@@ -816,13 +802,13 @@ public class TimestampITCase extends TestLogger {
 		public void cancel() {}
 	}
 
-	public static class MyTimestampSourceInfinite implements SourceFunction<Integer>, StoppableFunction {
+	private static class MyTimestampSourceInfinite implements SourceFunction<Integer>, StoppableFunction {
 
 		private final long initialTime;
 		private final int numWatermarks;
 
 		private volatile boolean running = true;
-		
+
 		public MyTimestampSourceInfinite(long initialTime, int numWatermarks) {
 			this.initialTime = initialTime;
 			this.numWatermarks = numWatermarks;
@@ -834,7 +820,7 @@ public class TimestampITCase extends TestLogger {
 				ctx.collectWithTimestamp(i, initialTime + i);
 				ctx.emitWatermark(new Watermark(initialTime + i));
 			}
-			
+
 			while (running) {
 				Thread.sleep(20);
 			}
@@ -851,7 +837,7 @@ public class TimestampITCase extends TestLogger {
 		}
 	}
 
-	public static class MyNonWatermarkingSource implements SourceFunction<Integer> {
+	private static class MyNonWatermarkingSource implements SourceFunction<Integer> {
 
 		int numWatermarks;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
index 903179d..b347c16 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -84,7 +85,7 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
 
 			@Override
 			public void cancel() {}
-			
+
 		}).assignTimestampsAndWatermarks(new Tuple2TimestampExtractor());
 
 		source1

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/EvenOddOutputSelector.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/EvenOddOutputSelector.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/EvenOddOutputSelector.java
index 8fc8372..804c19b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/EvenOddOutputSelector.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/EvenOddOutputSelector.java
@@ -15,12 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.test.streaming.runtime.util;
 
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 
 import java.util.Collections;
 
+/**
+ * {@link OutputSelector} mapping integers to "even" and "odd" streams.
+ */
 public class EvenOddOutputSelector implements OutputSelector<Integer> {
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/NoOpIntMap.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/NoOpIntMap.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/NoOpIntMap.java
index 6667446..2699ba8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/NoOpIntMap.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/NoOpIntMap.java
@@ -15,10 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.test.streaming.runtime.util;
 
 import org.apache.flink.api.common.functions.MapFunction;
 
+/**
+ * Identity mapper.
+ */
 public class NoOpIntMap implements MapFunction<Integer, Integer> {
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/ReceiveCheckNoOpSink.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/ReceiveCheckNoOpSink.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/ReceiveCheckNoOpSink.java
index 21d5294..8e60735 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/ReceiveCheckNoOpSink.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/ReceiveCheckNoOpSink.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.test.streaming.runtime.util;
 
 import org.apache.flink.configuration.Configuration;
@@ -25,6 +26,11 @@ import java.util.List;
 
 import static org.junit.Assert.assertTrue;
 
+/**
+ * SinkFunction asserting that at least one record was collected.
+ *
+ * @param <T> element type
+ */
 public final class ReceiveCheckNoOpSink<T> extends RichSinkFunction<T> {
 	private List<T> received;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListResultSink.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListResultSink.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListResultSink.java
index 3fabb4b..8d81ede 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListResultSink.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListResultSink.java
@@ -17,15 +17,18 @@
 
 package org.apache.flink.test.streaming.runtime.util;
 
-import java.util.Collections;
-import java.util.Comparator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.TreeSet;
 
+/**
+ * Thread-safe sink for collecting elements into an on-heap list.
+ *
+ * @param <T> element type
+ */
 public class TestListResultSink<T> extends RichSinkFunction<T> {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListWrapper.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListWrapper.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListWrapper.java
index 19ca8eb..b0b03e8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListWrapper.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/util/TestListWrapper.java
@@ -21,6 +21,9 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+/**
+ * Catalog for lists stored by {@link TestListResultSink}.
+ */
 public class TestListWrapper {
 
 	private static TestListWrapper instance;
@@ -61,4 +64,4 @@ public class TestListWrapper {
 		return list;
 	}
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/testfunctions/Tokenizer.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/testfunctions/Tokenizer.java b/flink-tests/src/test/java/org/apache/flink/test/testfunctions/Tokenizer.java
index 9b3764d..e0f8b15 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/testfunctions/Tokenizer.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/testfunctions/Tokenizer.java
@@ -22,6 +22,9 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 
+/**
+ * FlatMap for splitting strings.
+ */
 public final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
index e4dd535..54fe879 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
@@ -50,6 +50,7 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.DynamicCodeLoadingException;
 import org.apache.flink.util.StateMigrationException;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -58,6 +59,7 @@ import org.junit.runners.Parameterized;
 
 import javax.tools.JavaCompiler;
 import javax.tools.ToolProvider;
+
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
@@ -73,6 +75,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Tests the state migration behaviour when the underlying POJO type changes
+ * and one tries to recover from old state.
+ */
 @RunWith(Parameterized.class)
 public class PojoSerializerUpgradeTest extends TestLogger {
 
@@ -168,7 +174,7 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 		"@Override public String toString() {return \"(\" + a + \")\";}}";
 
 	/**
-	 * We should be able to handle a changed field order of a POJO as keyed state
+	 * We should be able to handle a changed field order of a POJO as keyed state.
 	 */
 	@Test
 	public void testChangedFieldOrderWithKeyedState() throws Exception {
@@ -176,7 +182,7 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 	}
 
 	/**
-	 * We should be able to handle a changed field order of a POJO as operator state
+	 * We should be able to handle a changed field order of a POJO as operator state.
 	 */
 	@Test
 	public void testChangedFieldOrderWithOperatorState() throws Exception {
@@ -184,7 +190,7 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 	}
 
 	/**
-	 * Changing field types of a POJO as keyed state should require a state migration
+	 * Changing field types of a POJO as keyed state should require a state migration.
 	 */
 	@Test
 	public void testChangedFieldTypesWithKeyedState() throws Exception {
@@ -201,7 +207,7 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 	}
 
 	/**
-	 * Changing field types of a POJO as operator state should require a state migration
+	 * Changing field types of a POJO as operator state should require a state migration.
 	 */
 	@Test
 	public void testChangedFieldTypesWithOperatorState() throws Exception {
@@ -218,7 +224,7 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 	}
 
 	/**
-	 * Adding fields to a POJO as keyed state should require a state migration
+	 * Adding fields to a POJO as keyed state should require a state migration.
 	 */
 	@Test
 	public void testAdditionalFieldWithKeyedState() throws Exception {
@@ -235,7 +241,7 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 	}
 
 	/**
-	 * Adding fields to a POJO as operator state should require a state migration
+	 * Adding fields to a POJO as operator state should require a state migration.
 	 */
 	@Test
 	public void testAdditionalFieldWithOperatorState() throws Exception {
@@ -252,7 +258,7 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 	}
 
 	/**
-	 * Removing fields from a POJO as keyed state should require a state migration
+	 * Removing fields from a POJO as keyed state should require a state migration.
 	 */
 	@Test
 	public void testMissingFieldWithKeyedState() throws Exception {
@@ -269,7 +275,7 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 	}
 
 	/**
-	 * Removing fields from a POJO as operator state should require a state migration
+	 * Removing fields from a POJO as operator state should require a state migration.
 	 */
 	@Test
 	public void testMissingFieldWithOperatorState() throws Exception {
@@ -380,7 +386,6 @@ public class PojoSerializerUpgradeTest extends TestLogger {
 			harness.processElement(value, timestamp++);
 		}
 
-
 		long checkpointId = 1L;
 		long checkpointTimestamp = timestamp + 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/util/CoordVector.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/CoordVector.java b/flink-tests/src/test/java/org/apache/flink/test/util/CoordVector.java
index 537c61e..aadeaeb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/CoordVector.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/CoordVector.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.test.util;
 
-import java.io.IOException;
-
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.Value;
 
+import java.io.IOException;
+
 /**
  * Implements a feature vector as a multi-dimensional point. Coordinates of that point
  * (= the features) are stored as double values. The distance between two feature vectors is
@@ -31,7 +31,7 @@ import org.apache.flink.types.Value;
  */
 public final class CoordVector implements Value, Comparable<CoordVector> {
 	private static final long serialVersionUID = 1L;
-	
+
 	// coordinate array
 	private double[] coordinates;
 
@@ -44,7 +44,7 @@ public final class CoordVector implements Value, Comparable<CoordVector> {
 
 	/**
 	 * Initializes a coordinate vector.
-	 * 
+	 *
 	 * @param coordinates The coordinate vector of a multi-dimensional point.
 	 */
 	public CoordVector(Double[] coordinates) {
@@ -56,7 +56,7 @@ public final class CoordVector implements Value, Comparable<CoordVector> {
 
 	/**
 	 * Initializes a coordinate vector.
-	 * 
+	 *
 	 * @param coordinates The coordinate vector of a multi-dimensional point.
 	 */
 	public CoordVector(double[] coordinates) {
@@ -65,16 +65,16 @@ public final class CoordVector implements Value, Comparable<CoordVector> {
 
 	/**
 	 * Returns the coordinate vector of a multi-dimensional point.
-	 * 
+	 *
 	 * @return The coordinate vector of a multi-dimensional point.
 	 */
 	public double[] getCoordinates() {
 		return this.coordinates;
 	}
-	
+
 	/**
 	 * Sets the coordinate vector of a multi-dimensional point.
-	 * 
+	 *
 	 * @param coordinates The dimension values of the point.
 	 */
 	public void setCoordinates(double[] coordinates) {
@@ -84,7 +84,7 @@ public final class CoordVector implements Value, Comparable<CoordVector> {
 	/**
 	 * Computes the Euclidian distance between this coordinate vector and a
 	 * second coordinate vector.
-	 * 
+	 *
 	 * @param cv The coordinate vector to which the distance is computed.
 	 * @return The Euclidian distance to coordinate vector cv. If cv has a
 	 *         different length than this coordinate vector, -1 is returned.
@@ -98,12 +98,11 @@ public final class CoordVector implements Value, Comparable<CoordVector> {
 		double quadSum = 0.0;
 		for (int i = 0; i < this.coordinates.length; i++) {
 			double diff = this.coordinates[i] - cv.coordinates[i];
-			quadSum += diff*diff;
+			quadSum += diff * diff;
 		}
 		return Math.sqrt(quadSum);
 	}
 
-
 	@Override
 	public void read(DataInputView in) throws IOException {
 		int length = in.readInt();
@@ -113,7 +112,6 @@ public final class CoordVector implements Value, Comparable<CoordVector> {
 		}
 	}
 
-
 	@Override
 	public void write(DataOutputView out) throws IOException {
 		out.writeInt(this.coordinates.length);
@@ -124,7 +122,7 @@ public final class CoordVector implements Value, Comparable<CoordVector> {
 
 	/**
 	 * Compares this coordinate vector to another key.
-	 * 
+	 *
 	 * @return -1 if the other key is not of type CoordVector. If the other
 	 *         key is also a CoordVector but its length differs from this
 	 *         coordinates vector, -1 is return if this coordinate vector is

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
index 1409848..dc37c5d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/DataSetUtilsITCase.java
@@ -29,8 +29,9 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple8;
 import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.operators.util.CollectionDataSets;
 import org.apache.flink.types.DoubleValue;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -43,6 +44,9 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+/**
+ * Integration tests for {@link DataSetUtils}.
+ */
 @RunWith(Parameterized.class)
 public class DataSetUtilsITCase extends MultipleProgramsTestBase {
 
@@ -52,14 +56,14 @@ public class DataSetUtilsITCase extends MultipleProgramsTestBase {
 
 	@Test
 	public void testCountElementsPerPartition() throws Exception {
-	 	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-	 	long expectedSize = 100L;
-	 	DataSet<Long> numbers = env.generateSequence(0, expectedSize - 1);
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		long expectedSize = 100L;
+		DataSet<Long> numbers = env.generateSequence(0, expectedSize - 1);
 
-	 	DataSet<Tuple2<Integer, Long>> ds = DataSetUtils.countElementsPerPartition(numbers);
+		DataSet<Tuple2<Integer, Long>> ds = DataSetUtils.countElementsPerPartition(numbers);
 
-	 	Assert.assertEquals(env.getParallelism(), ds.count());
-	 	Assert.assertEquals(expectedSize, ds.sum(1).collect().get(0).f1.longValue());
+		Assert.assertEquals(env.getParallelism(), ds.count());
+		Assert.assertEquals(expectedSize, ds.sum(1).collect().get(0).f1.longValue());
 	}
 
 	@Test
@@ -90,7 +94,7 @@ public class DataSetUtilsITCase extends MultipleProgramsTestBase {
 		long expectedSize = 100L;
 		DataSet<Long> numbers = env.generateSequence(1L, expectedSize);
 
-		DataSet<Long> ids = DataSetUtils.zipWithUniqueId(numbers).map(new MapFunction<Tuple2<Long,Long>, Long>() {
+		DataSet<Long> ids = DataSetUtils.zipWithUniqueId(numbers).map(new MapFunction<Tuple2<Long, Long>, Long>() {
 			@Override
 			public Long map(Tuple2<Long, Long> value) throws Exception {
 				return value.f0;
@@ -118,14 +122,14 @@ public class DataSetUtilsITCase extends MultipleProgramsTestBase {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 		List<Tuple8<Short, Integer, Long, Float, Double, String, Boolean, DoubleValue>> data = new ArrayList<>();
-		data.add(new Tuple8<>((short)1, 1, 100L, 0.1f, 1.012376, "hello", false, new DoubleValue(50.0)));
-		data.add(new Tuple8<>((short)2, 2, 1000L, 0.2f, 2.003453, "hello", true, new DoubleValue(50.0)));
-		data.add(new Tuple8<>((short)4, 10, 10000L, 0.2f, 75.00005, "null", true, new DoubleValue(50.0)));
-		data.add(new Tuple8<>((short)10, 4, 100L, 0.9f, 79.5, "", true, new DoubleValue(50.0)));
-		data.add(new Tuple8<>((short)5, 5, 1000L, 0.2f, 10.0000001, "a", false, new DoubleValue(50.0)));
-		data.add(new Tuple8<>((short)6, 6, 10L, 0.1f, 0.0000000000023, "", true, new DoubleValue(100.0)));
-		data.add(new Tuple8<>((short)7, 7, 1L, 0.2f, Double.POSITIVE_INFINITY, "abcdefghijklmnop", true, new DoubleValue(100.0)));
-		data.add(new Tuple8<>((short)8, 8, -100L, 0.001f, Double.NaN, "abcdefghi", true, new DoubleValue(100.0)));
+		data.add(new Tuple8<>((short) 1, 1, 100L, 0.1f, 1.012376, "hello", false, new DoubleValue(50.0)));
+		data.add(new Tuple8<>((short) 2, 2, 1000L, 0.2f, 2.003453, "hello", true, new DoubleValue(50.0)));
+		data.add(new Tuple8<>((short) 4, 10, 10000L, 0.2f, 75.00005, "null", true, new DoubleValue(50.0)));
+		data.add(new Tuple8<>((short) 10, 4, 100L, 0.9f, 79.5, "", true, new DoubleValue(50.0)));
+		data.add(new Tuple8<>((short) 5, 5, 1000L, 0.2f, 10.0000001, "a", false, new DoubleValue(50.0)));
+		data.add(new Tuple8<>((short) 6, 6, 10L, 0.1f, 0.0000000000023, "", true, new DoubleValue(100.0)));
+		data.add(new Tuple8<>((short) 7, 7, 1L, 0.2f, Double.POSITIVE_INFINITY, "abcdefghijklmnop", true, new DoubleValue(100.0)));
+		data.add(new Tuple8<>((short) 8, 8, -100L, 0.001f, Double.NaN, "abcdefghi", true, new DoubleValue(100.0)));
 
 		Collections.shuffle(data);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/util/InfiniteIntegerInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/InfiniteIntegerInputFormat.java b/flink-tests/src/test/java/org/apache/flink/test/util/InfiniteIntegerInputFormat.java
index 8ebb87a..6f6f2a8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/InfiniteIntegerInputFormat.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/InfiniteIntegerInputFormat.java
@@ -15,11 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.test.util;
 
-import java.io.IOException;
 import org.apache.flink.api.common.io.GenericInputFormat;
 
+import java.io.IOException;
+
+/**
+ * Generates an infinite series of integer elements with optional read delay.
+ */
 public class InfiniteIntegerInputFormat extends GenericInputFormat<Integer> {
 	private static final long serialVersionUID = 1L;
 	private static final int DELAY = 20;

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/util/InfiniteIntegerTupleInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/InfiniteIntegerTupleInputFormat.java b/flink-tests/src/test/java/org/apache/flink/test/util/InfiniteIntegerTupleInputFormat.java
index ad6edd4..660ff08 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/InfiniteIntegerTupleInputFormat.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/InfiniteIntegerTupleInputFormat.java
@@ -15,12 +15,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.test.util;
 
-import java.io.IOException;
 import org.apache.flink.api.common.io.GenericInputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 
+import java.io.IOException;
+
+/**
+ * Generates an infinite series of integer 2-tuples elements with optional read delay.
+ */
 public class InfiniteIntegerTupleInputFormat extends GenericInputFormat<Tuple2<Integer, Integer>> {
 	private static final long serialVersionUID = 1L;
 	private static final int DELAY = 20;

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/util/PointFormatter.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/PointFormatter.java b/flink-tests/src/test/java/org/apache/flink/test/util/PointFormatter.java
index b0dd78e..6c4546e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/PointFormatter.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/PointFormatter.java
@@ -15,13 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.test.util;
 
-import java.text.DecimalFormat;
-import java.text.DecimalFormatSymbols;
 import org.apache.flink.api.java.io.TextOutputFormat.TextFormatter;
 import org.apache.flink.api.java.tuple.Tuple2;
 
+import java.text.DecimalFormat;
+import java.text.DecimalFormatSymbols;
+
 /**
  * Writes records that contain an id and a CoordVector. The output format is line-based, i.e. one record is written to a
  * line and terminated with '\n'. Within a line the first '|' character separates the id from the CoordVector. The

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/util/PointInFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/PointInFormat.java b/flink-tests/src/test/java/org/apache/flink/test/util/PointInFormat.java
index 424b781..492499a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/PointInFormat.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/PointInFormat.java
@@ -15,13 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.test.util;
 
+import org.apache.flink.api.common.io.DelimitedInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.flink.api.common.io.DelimitedInputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
 
 /**
  * Generates records with an id and a and CoordVector. The input format is line-based, i.e. one record is read from one
@@ -89,7 +91,7 @@ public class PointInFormat extends DelimitedInputFormat<Tuple2<Integer, CoordVec
 		for (int i = 0; i < this.pointValues.length; i++) {
 			this.pointValues[i] = this.dimensionValues.get(i);
 		}
-		
+
 		reuse.f1 = new CoordVector(this.pointValues);
 		return reuse;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java b/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
index 4413d3f..1606783 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
@@ -25,8 +25,11 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 import static org.junit.Assert.fail;
 
+/**
+ * Test utilities.
+ */
 public class TestUtils {
-	
+
 	public static JobExecutionResult tryExecute(StreamExecutionEnvironment see, String name) throws Exception {
 		try {
 			return see.execute(name);
@@ -46,7 +49,7 @@ public class TestUtils {
 				}
 			}
 		}
-		
+
 		return null;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/util/UniformIntTupleGeneratorInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/UniformIntTupleGeneratorInputFormat.java b/flink-tests/src/test/java/org/apache/flink/test/util/UniformIntTupleGeneratorInputFormat.java
index c779275..2d34d0d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/UniformIntTupleGeneratorInputFormat.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/UniformIntTupleGeneratorInputFormat.java
@@ -15,14 +15,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.test.util;
 
-import java.io.IOException;
 import org.apache.flink.api.common.io.GenericInputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
 
+import java.io.IOException;
+
+/**
+ * Generates a series of integer 2-tuples.
+ */
 public class UniformIntTupleGeneratorInputFormat extends GenericInputFormat<Tuple2<Integer, Integer>> {
 	private final int keyTotal;
 	private final int valueTotal;
@@ -38,7 +43,6 @@ public class UniformIntTupleGeneratorInputFormat extends GenericInputFormat<Tupl
 	public void open(GenericInputSplit split) throws IOException {
 		super.open(split);
 		this.generator = new UniformIntTupleGenerator(keyTotal, valueTotal, false);
-
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/EventGenerator.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/EventGenerator.java b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/EventGenerator.java
index fcc45d1..edd3e44 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/EventGenerator.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/EventGenerator.java
@@ -19,7 +19,7 @@
 package org.apache.flink.test.windowing.sessionwindows;
 
 /**
- * Basic interface for event generators
+ * Basic interface for event generators.
  *
  * @param <K> session key type
  * @param <E> session event type

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/EventGeneratorFactory.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/EventGeneratorFactory.java b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/EventGeneratorFactory.java
index 5f91ced..d4c1a47 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/EventGeneratorFactory.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/EventGeneratorFactory.java
@@ -23,9 +23,8 @@ import org.apache.flink.util.Preconditions;
 import java.util.HashMap;
 import java.util.Map;
 
-
 /**
- * Produces the session event generators
+ * Produces the session event generators.
  *
  * @param <K> type of session keys
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/GeneratorConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/GeneratorConfiguration.java b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/GeneratorConfiguration.java
index acfe544..7591524 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/GeneratorConfiguration.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/GeneratorConfiguration.java
@@ -19,7 +19,7 @@
 package org.apache.flink.test.windowing.sessionwindows;
 
 /**
- * Configuration for event generators
+ * Configuration for event generators.
  */
 public final class GeneratorConfiguration {
 
@@ -35,10 +35,11 @@ public final class GeneratorConfiguration {
 	// hint for the maximum additional gap introduced between event times of two generators to separate sessions
 	private final long maxAdditionalSessionGap;
 
-	public GeneratorConfiguration(long allowedLateness,
-	                              int lateEventsWithinLateness,
-	                              int lateEventsAfterLateness,
-	                              long maxAdditionalSessionGap) {
+	public GeneratorConfiguration(
+			long allowedLateness,
+			int lateEventsWithinLateness,
+			int lateEventsAfterLateness,
+			long maxAdditionalSessionGap) {
 		this.allowedLateness = allowedLateness;
 		this.lateEventsWithinLateness = lateEventsWithinLateness;
 		this.lateEventsAfterLateness = lateEventsAfterLateness;
@@ -61,10 +62,11 @@ public final class GeneratorConfiguration {
 		return maxAdditionalSessionGap;
 	}
 
-	public static GeneratorConfiguration of(long allowedLateness,
-	                                        int lateEventsPerSessionWithinLateness,
-	                                        int lateEventsPerSessionOutsideLateness,
-	                                        long maxAdditionalSessionGap) {
+	public static GeneratorConfiguration of(
+			long allowedLateness,
+			int lateEventsPerSessionWithinLateness,
+			int lateEventsPerSessionOutsideLateness,
+			long maxAdditionalSessionGap) {
 		return new GeneratorConfiguration(
 				allowedLateness,
 				lateEventsPerSessionWithinLateness,
@@ -81,4 +83,4 @@ public final class GeneratorConfiguration {
 				", maxAdditionalSessionGap=" + maxAdditionalSessionGap +
 				'}';
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/GeneratorEventFactory.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/GeneratorEventFactory.java b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/GeneratorEventFactory.java
index 08d5858..0d12e58 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/GeneratorEventFactory.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/GeneratorEventFactory.java
@@ -21,7 +21,7 @@ package org.apache.flink.test.windowing.sessionwindows;
 /**
  * Factory that produces events of keyed session generators
  *
- * If types of generated events diverge more, we can consider also specify the input parameters to createEvent(...) as
+ * <p>If types of generated events diverge more, we can consider also specify the input parameters to createEvent(...) as
  * a generic object type (containing all the data).
  *
  * @param <K> type of produced event key
@@ -38,10 +38,11 @@ public interface GeneratorEventFactory<K, E> {
 	 * @param timing          indicator for lateness
 	 * @return event for an keyed event generator
 	 */
-	E createEvent(K key,
-	              int sessionId,
-	              int eventId,
-	              long eventTimestamp,
-	              long globalWatermark,
-	              SessionEventGeneratorImpl.Timing timing);
+	E createEvent(
+		K key,
+		int sessionId,
+		int eventId,
+		long eventTimestamp,
+		long globalWatermark,
+		SessionEventGeneratorImpl.Timing timing);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/LongRandomGenerator.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/LongRandomGenerator.java b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/LongRandomGenerator.java
index 132d307..0194959 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/LongRandomGenerator.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/LongRandomGenerator.java
@@ -21,6 +21,9 @@ package org.apache.flink.test.windowing.sessionwindows;
 import java.util.Collection;
 import java.util.Random;
 
+/**
+ * Provide additional PRNG methods for selecting in a range and for collection choice.
+ */
 public class LongRandomGenerator extends Random {
 
 	static final long serialVersionUID = 32523525277L;
@@ -60,7 +63,7 @@ public class LongRandomGenerator extends Random {
 	/**
 	 * @return a randomly chosen element from collection
 	 */
-	public <T> T choseRandomElement(Collection<T> collection) {
+	public <T> T chooseRandomElement(Collection<T> collection) {
 		int choice = choseRandomIndex(collection);
 		for (T key : collection) {
 			if (choice == 0) {
@@ -70,4 +73,4 @@ public class LongRandomGenerator extends Random {
 		}
 		return null;
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/ParallelSessionsEventGenerator.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/ParallelSessionsEventGenerator.java b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/ParallelSessionsEventGenerator.java
index b185f9a..56a95ce 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/ParallelSessionsEventGenerator.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/ParallelSessionsEventGenerator.java
@@ -27,10 +27,10 @@ import java.util.Set;
 /**
  * Generator that employs several (sub-) event generators to produce events for multiple sessions in parallel, i.e.
  * events are emitted in an interleaved way.
- * <p>
- * Even events that belong to different sessions for the same key can be generated in parallel.
- * <p>
- * The watermark is computed as the minimum of watermarks among all current sub-generators.
+ *
+ * <p>Even events that belong to different sessions for the same key can be generated in parallel.
+ *
+ * <p>The watermark is computed as the minimum of watermarks among all current sub-generators.
  *
  * @param <K> session key type
  * @param <E> session event type
@@ -107,7 +107,7 @@ public class ParallelSessionsEventGenerator<K, E> {
 					if (generatorFactory.getProducedGeneratorsCount() < sessionCountLimit) {
 						subGeneratorLists.set(index,
 								generatorFactory.newSessionGeneratorForKey(
-										randomGenerator.choseRandomElement(sessionKeys), getWatermark()));
+										randomGenerator.chooseRandomElement(sessionKeys), getWatermark()));
 					} else {
 						// otherwise removes the sub-generator and shrinks the list of open sessions permanently
 						subGeneratorLists.remove(index);
@@ -140,8 +140,8 @@ public class ParallelSessionsEventGenerator<K, E> {
 	private void initParallelSessionGenerators(int parallelSessions) {
 		for (int i = 0; i < parallelSessions && generatorFactory.getProducedGeneratorsCount() < sessionCountLimit; ++i) {
 			subGeneratorLists.add(generatorFactory.newSessionGeneratorForKey(
-					randomGenerator.choseRandomElement(sessionKeys), 0L));
+					randomGenerator.chooseRandomElement(sessionKeys), 0L));
 		}
 	}
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionConfiguration.java b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionConfiguration.java
index cb481f5..d329a30 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionConfiguration.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionConfiguration.java
@@ -21,7 +21,7 @@ package org.apache.flink.test.windowing.sessionwindows;
 import org.apache.flink.util.Preconditions;
 
 /**
- * Configuration data for a session
+ * Configuration data for a session.
  *
  * @param <K> type of session key
  * @param <E> type of session event
@@ -122,4 +122,4 @@ public final class SessionConfiguration<K, E> {
 				getNumberOfTimelyEvents(),
 				getEventFactory());
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionEvent.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionEvent.java b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionEvent.java
index 1f5ac60..94bcc2b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionEvent.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionEvent.java
@@ -36,7 +36,7 @@ public final class SessionEvent<K, V> {
 
 	// event timestamp (in ms)
 	private long eventTimestamp;
-	
+
 	public SessionEvent() {
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionEventGeneratorImpl.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionEventGeneratorImpl.java b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionEventGeneratorImpl.java
index e8a6df6..34f37a7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionEventGeneratorImpl.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionEventGeneratorImpl.java
@@ -33,7 +33,7 @@ import java.util.List;
 public class SessionEventGeneratorImpl<K, E> implements EventGenerator<K, E> {
 
 	/**
-	 * Event timing w.r.t the global watermark
+	 * Event timing w.r.t the global watermark.
 	 */
 	public enum Timing {
 		TIMELY, IN_LATENESS, AFTER_LATENESS
@@ -126,7 +126,7 @@ public class SessionEventGeneratorImpl<K, E> implements EventGenerator<K, E> {
 	}
 
 	/**
-	 * pre-computes and stores the timestamps for timely events in this session in a list (ordered)
+	 * Pre-computes and stores the timestamps for timely events in this session in a list (ordered).
 	 *
 	 * @param minTimestamp              the minimum event time in the session
 	 * @param onTimeEventCountInSession the number of timestamps to generate
@@ -269,7 +269,7 @@ public class SessionEventGeneratorImpl<K, E> implements EventGenerator<K, E> {
 	}
 
 	/**
-	 * internal generator delegate for producing session events that are timely
+	 * Internal generator delegate for producing session events that are timely.
 	 */
 	private class TimelyGenerator extends AbstractEventGenerator {
 
@@ -300,7 +300,7 @@ public class SessionEventGeneratorImpl<K, E> implements EventGenerator<K, E> {
 	}
 
 	/**
-	 * internal generator delegate for producing late session events with timestamps within the allowed lateness
+	 * Internal generator delegate for producing late session events with timestamps within the allowed lateness.
 	 */
 	private class InLatenessGenerator extends AbstractEventGenerator {
 
@@ -331,7 +331,7 @@ public class SessionEventGeneratorImpl<K, E> implements EventGenerator<K, E> {
 	}
 
 	/**
-	 * internal generator delegate for producing late session events with timestamps after the lateness
+	 * Internal generator delegate for producing late session events with timestamps after the lateness.
 	 */
 	private class AfterLatenessGenerator extends AbstractEventGenerator {
 
@@ -361,4 +361,4 @@ public class SessionEventGeneratorImpl<K, E> implements EventGenerator<K, E> {
 		}
 	}
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionGeneratorConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionGeneratorConfiguration.java b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionGeneratorConfiguration.java
index 4f29eb3..edd2ba5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionGeneratorConfiguration.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionGeneratorConfiguration.java
@@ -57,4 +57,4 @@ public final class SessionGeneratorConfiguration<K, E> {
 				", generatorConfiguration=" + generatorConfiguration +
 				'}';
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java
index 4e4cb20..494b8d6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -44,7 +45,7 @@ import java.util.List;
 import java.util.Set;
 
 /**
- * ITCase for Session Windows
+ * ITCase for Session Windows.
  */
 public class SessionWindowITCase extends StreamingMultipleProgramsTestBase {
 
@@ -100,11 +101,10 @@ public class SessionWindowITCase extends StreamingMultipleProgramsTestBase {
 			WindowFunction<SessionEvent<Integer, TestEventPayload>,
 					String, Tuple, TimeWindow> windowFunction) throws Exception {
 
-
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-		WindowedStream<SessionEvent<Integer, TestEventPayload>, Tuple, TimeWindow> windowedStream
-				= env.addSource(dataSource).keyBy("sessionKey")
+		WindowedStream<SessionEvent<Integer, TestEventPayload>, Tuple, TimeWindow> windowedStream =
+				env.addSource(dataSource).keyBy("sessionKey")
 				.window(EventTimeSessionWindows.withGap(Time.milliseconds(MAX_SESSION_EVENT_GAP_MS)));
 
 		if (ALLOWED_LATENESS_MS != Long.MAX_VALUE) {
@@ -129,7 +129,7 @@ public class SessionWindowITCase extends StreamingMultipleProgramsTestBase {
 	}
 
 	/**
-	 * Window function that performs correctness checks for this test case
+	 * Window function that performs correctness checks for this test case.
 	 */
 	private static final class ValidatingWindowFunction extends RichWindowFunction<SessionEvent<Integer,
 			TestEventPayload>, String, Tuple, TimeWindow> {
@@ -200,7 +200,7 @@ public class SessionWindowITCase extends StreamingMultipleProgramsTestBase {
 	}
 
 	/**
-	 * A data source that is fed from a ParallelSessionsEventGenerator
+	 * A data source that is fed from a ParallelSessionsEventGenerator.
 	 */
 	private static final class SessionEventGeneratorDataSource
 			implements SourceFunction<SessionEvent<Integer, TestEventPayload>> {

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/TestEventPayload.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/TestEventPayload.java b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/TestEventPayload.java
index 13b1180..c532269 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/TestEventPayload.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/TestEventPayload.java
@@ -21,7 +21,7 @@ package org.apache.flink.test.windowing.sessionwindows;
 import org.apache.flink.util.Preconditions;
 
 /**
- * Test payload that contains useful information for the correctness checks in our test
+ * Test payload that contains useful information for the correctness checks in our test.
  */
 public final class TestEventPayload {
 
@@ -103,4 +103,4 @@ public final class TestEventPayload {
 		return new TestEventPayload(watermark, sessionId, eventId, timing);
 	}
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9bd491e0/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
index 2dabb56..9d286fe 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupCombineITCase.scala
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.operators.Order
 import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.test.javaApiOperators.GroupCombineITCase.ScalaGroupCombineFunctionExample
+import org.apache.flink.test.operators.GroupCombineITCase.ScalaGroupCombineFunctionExample
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.MultipleProgramsTestBase
 import org.apache.flink.util.Collector


Mime
View raw message