flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [01/10] flink git commit: [FLINK-6603] [streaming] Enable checkstyle on test sources
Date Mon, 22 May 2017 21:50:20 GMT
Repository: flink
Updated Branches:
  refs/heads/master 7efa8ad34 -> 12b4185c6


http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index a9d2ddf..5b995c6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -27,7 +27,6 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.event.AbstractEvent;
@@ -73,6 +72,9 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+/**
+ * Mock {@link Environment}.
+ */
 public class StreamMockEnvironment implements Environment {
 
 	private final TaskInfo taskInfo;
@@ -106,7 +108,7 @@ public class StreamMockEnvironment implements Environment {
 	private volatile boolean wasFailedExternally = false;
 
 	public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, ExecutionConfig executionConfig,
-								 long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
+								long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
 		this.taskInfo = new TaskInfo(
 			"", /* task name */
 			1, /* num key groups / max parallelism */
@@ -131,7 +133,7 @@ public class StreamMockEnvironment implements Environment {
 	}
 
 	public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, long memorySize,
-								 MockInputSplitProvider inputSplitProvider, int bufferSize) {
+								MockInputSplitProvider inputSplitProvider, int bufferSize) {
 		this(jobConfig, taskConfig, new ExecutionConfig(), memorySize, inputSplitProvider, bufferSize);
 	}
 
@@ -183,7 +185,6 @@ public class StreamMockEnvironment implements Environment {
 				}
 			}).when(mockWriter).writeBufferToAllChannels(any(Buffer.class));
 
-
 			outputs.add(mockWriter);
 		}
 		catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
index f8d5393..6e3c299 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -20,15 +20,16 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.api.operators.co.CoStreamMap;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -41,6 +42,9 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+/**
+ * Test checkpoint cancellation barrier.
+ */
 public class StreamTaskCancellationBarrierTest {
 
 	/**
@@ -71,9 +75,8 @@ public class StreamTaskCancellationBarrierTest {
 	/**
 	 * This test verifies (for onw input tasks) that the Stream tasks react the following way to
 	 * receiving a checkpoint cancellation barrier:
-	 *
 	 *   - send a "decline checkpoint" notification out (to the JobManager)
-	 *   - emit a cancellation barrier downstream
+	 *   - emit a cancellation barrier downstream.
 	 */
 	@Test
 	public void testDeclineCallOnCancelBarrierOneInput() throws Exception {
@@ -115,11 +118,10 @@ public class StreamTaskCancellationBarrierTest {
 	}
 
 	/**
-	 * This test verifies (for onw input tasks) that the Stream tasks react the following way to
+	 * This test verifies (for one input tasks) that the Stream tasks react the following way to
 	 * receiving a checkpoint cancellation barrier:
-	 *
 	 *   - send a "decline checkpoint" notification out (to the JobManager)
-	 *   - emit a cancellation barrier downstream
+	 *   - emit a cancellation barrier downstream.
 	 */
 	@Test
 	public void testDeclineCallOnCancelBarrierTwoInputs() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 546188e..8957255 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import akka.dispatch.Futures;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
@@ -88,6 +87,8 @@ import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
+
+import akka.dispatch.Futures;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -137,13 +138,16 @@ import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.withSettings;
 import static org.powermock.api.mockito.PowerMockito.whenNew;
 
+/**
+ * Tests for {@link StreamTask}.
+ */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(StreamTask.class)
 @PowerMockIgnore("org.apache.log4j.*")
 @SuppressWarnings("deprecation")
 public class StreamTaskTest extends TestLogger {
 
-	private static OneShotLatch SYNC_LATCH;
+	private static OneShotLatch syncLatch;
 
 	/**
 	 * This test checks that cancel calls that are issued before the operator is
@@ -242,7 +246,7 @@ public class StreamTaskTest extends TestLogger {
 
 	@Test
 	public void testCancellationNotBlockedOnLock() throws Exception {
-		SYNC_LATCH = new OneShotLatch();
+		syncLatch = new OneShotLatch();
 
 		StreamConfig cfg = new StreamConfig(new Configuration());
 		Task task = createTask(CancelLockingTask.class, cfg, new Configuration());
@@ -251,7 +255,7 @@ public class StreamTaskTest extends TestLogger {
 		// execution state RUNNING is not enough, we need to wait until the stream task's run() method
 		// is entered
 		task.startTaskThread();
-		SYNC_LATCH.await();
+		syncLatch.await();
 
 		// cancel the execution - this should lead to smooth shutdown
 		task.cancelExecution();
@@ -262,7 +266,7 @@ public class StreamTaskTest extends TestLogger {
 
 	@Test
 	public void testCancellationFailsWithBlockingLock() throws Exception {
-		SYNC_LATCH = new OneShotLatch();
+		syncLatch = new OneShotLatch();
 
 		StreamConfig cfg = new StreamConfig(new Configuration());
 		Task task = createTask(CancelFailingTask.class, cfg, new Configuration());
@@ -271,7 +275,7 @@ public class StreamTaskTest extends TestLogger {
 		// execution state RUNNING is not enough, we need to wait until the stream task's run() method
 		// is entered
 		task.startTaskThread();
-		SYNC_LATCH.await();
+		syncLatch.await();
 
 		// cancel the execution - this should lead to smooth shutdown
 		task.cancelExecution();
@@ -422,7 +426,7 @@ public class StreamTaskTest extends TestLogger {
 	/**
 	 * FLINK-5667
 	 *
-	 * Tests that a concurrent cancel operation does not discard the state handles of an
+	 * <p>Tests that a concurrent cancel operation does not discard the state handles of an
 	 * acknowledged checkpoint. The situation can only happen if the cancel call is executed
 	 * after Environment.acknowledgeCheckpoint() and before the
 	 * CloseableRegistry.unregisterClosable() call.
@@ -534,7 +538,7 @@ public class StreamTaskTest extends TestLogger {
 	/**
 	 * FLINK-5667
 	 *
-	 * Tests that a concurrent cancel operation discards the state handles of a not yet
+	 * <p>Tests that a concurrent cancel operation discards the state handles of a not yet
 	 * acknowledged checkpoint and prevents sending an acknowledge message to the
 	 * CheckpointCoordinator. The situation can only happen if the cancel call is executed
 	 * before Environment.acknowledgeCheckpoint().
@@ -560,11 +564,11 @@ public class StreamTaskTest extends TestLogger {
 				completeSubtask.await();
 
 				return new SubtaskState(
-					(ChainedStateHandle<StreamStateHandle>)invocation.getArguments()[0],
-					(ChainedStateHandle<OperatorStateHandle>)invocation.getArguments()[1],
-					(ChainedStateHandle<OperatorStateHandle>)invocation.getArguments()[2],
-					(KeyedStateHandle)invocation.getArguments()[3],
-					(KeyedStateHandle)invocation.getArguments()[4]);
+					(ChainedStateHandle<StreamStateHandle>) invocation.getArguments()[0],
+					(ChainedStateHandle<OperatorStateHandle>) invocation.getArguments()[1],
+					(ChainedStateHandle<OperatorStateHandle>) invocation.getArguments()[2],
+					(KeyedStateHandle) invocation.getArguments()[3],
+					(KeyedStateHandle) invocation.getArguments()[4]);
 			}
 		});
 
@@ -643,7 +647,7 @@ public class StreamTaskTest extends TestLogger {
 	/**
 	 * FLINK-5985
 	 *
-	 * This test ensures that empty snapshots (no op/keyed stated whatsoever) will be reported as stateless tasks. This
+	 * <p>This test ensures that empty snapshots (no op/keyed stated whatsoever) will be reported as stateless tasks. This
 	 * happens by translating an empty {@link SubtaskState} into reporting 'null' to #acknowledgeCheckpoint.
 	 */
 	@Test
@@ -819,7 +823,7 @@ public class StreamTaskTest extends TestLogger {
 	//  Test operators
 	// ------------------------------------------------------------------------
 
-	public static class SlowlyDeserializingOperator extends StreamSource<Long, SourceFunction<Long>> {
+	private static class SlowlyDeserializingOperator extends StreamSource<Long, SourceFunction<Long>> {
 		private static final long serialVersionUID = 1L;
 
 		private volatile boolean canceled = false;
@@ -955,7 +959,7 @@ public class StreamTaskTest extends TestLogger {
 	}
 
 	/**
-	 * A task that locks if cancellation attempts to cleanly shut down
+	 * A task that locks if cancellation attempts to cleanly shut down.
 	 */
 	public static class CancelLockingTask extends StreamTask<String, AbstractStreamOperator<String>> {
 
@@ -973,7 +977,7 @@ public class StreamTaskTest extends TestLogger {
 			latch.await();
 
 			// we are at the point where cancelling can happen
-			SYNC_LATCH.trigger();
+			syncLatch.trigger();
 
 			// just put this to sleep until it is interrupted
 			try {
@@ -999,7 +1003,7 @@ public class StreamTaskTest extends TestLogger {
 	}
 
 	/**
-	 * A task that locks if cancellation attempts to cleanly shut down
+	 * A task that locks if cancellation attempts to cleanly shut down.
 	 */
 	public static class CancelFailingTask extends StreamTask<String, AbstractStreamOperator<String>> {
 
@@ -1021,7 +1025,7 @@ public class StreamTaskTest extends TestLogger {
 				latch.await();
 
 				// we are at the point where cancelling can happen
-				SYNC_LATCH.trigger();
+				syncLatch.trigger();
 
 				// try to acquire the lock - this is not possible as long as the lock holder
 				// thread lives
@@ -1050,7 +1054,7 @@ public class StreamTaskTest extends TestLogger {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * A thread that holds a lock as long as it lives
+	 * A thread that holds a lock as long as it lives.
 	 */
 	private static final class LockHolder extends Thread implements Closeable {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 0be85b1..a02fe4e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -35,8 +35,8 @@ import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 
 import org.junit.Assert;
 
@@ -44,24 +44,20 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * Test harness for testing a {@link StreamTask}.
  *
- * <p>
- * This mock Invokable provides the task with a basic runtime context and allows pushing elements
+ * <p>This mock Invokable provides the task with a basic runtime context and allows pushing elements
  * and watermarks into the task. {@link #getOutput()} can be used to get the emitted elements
  * and events. You are free to modify the retrieved list.
  *
- * <p>
- * After setting up everything the Task can be invoked using {@link #invoke()}. This will start
+ * <p>After setting up everything the Task can be invoked using {@link #invoke()}. This will start
  * a new Thread to execute the Task. Use {@link #waitForTaskCompletion()} to wait for the Task
  * thread to finish.
  *
- * <p>
- * When using this you need to add the following line to your test class to setup Powermock:
+ * <p>When using this you need to add the following line to your test class to setup Powermock:
  * {@code {@literal @}PrepareForTest({ResultPartitionWriter.class})}
  */
 public class StreamTaskTestHarness<OUT> {
@@ -135,7 +131,7 @@ public class StreamTaskTestHarness<OUT> {
 	 * if there will only be a single operator to be tested. The method will setup the
 	 * outgoing network connection for the operator.
 	 *
-	 * For more advanced test cases such as testing chains of multiple operators with the harness,
+	 * <p>For more advanced test cases such as testing chains of multiple operators with the harness,
 	 * please manually configure the stream config.
 	 */
 	public void setupOutputForSingletonOperatorChain() {

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
index 1f8638e..890fc23 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
@@ -37,6 +37,9 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Tests for {@link SystemProcessingTimeService}.
+ */
 public class SystemProcessingTimeServiceTest extends TestLogger {
 
 	@Test
@@ -74,7 +77,7 @@ public class SystemProcessingTimeServiceTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that the schedule at fixed rate callback is called under the given lock
+	 * Tests that the schedule at fixed rate callback is called under the given lock.
 	 */
 	@Test
 	public void testScheduleAtFixedRateHoldsLock() throws Exception {
@@ -123,7 +126,7 @@ public class SystemProcessingTimeServiceTest extends TestLogger {
 	 * Tests that SystemProcessingTimeService#scheduleAtFixedRate is actually triggered multiple
 	 * times.
 	 */
-	@Test(timeout=10000)
+	@Test(timeout = 10000)
 	public void testScheduleAtFixedRate() throws Exception {
 		final Object lock = new Object();
 		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
@@ -432,7 +435,7 @@ public class SystemProcessingTimeServiceTest extends TestLogger {
 			}
 		},
 			0L,
-			100L	);
+			100L);
 
 		latch.await();
 		assertTrue(exceptionWasThrown.get());

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index d465619..66531ac 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -42,8 +42,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
  * Tests for {@link org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask}. Theses tests
  * implicitly also test the {@link org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor}.
  *
- * <p>
- * Note:<br>
+ * <p>Note:<br>
  * We only use a {@link CoStreamMap} operator here. We also test the individual operators but Map is
  * used as a representative to test TwoInputStreamTask, since TwoInputStreamTask is used for all
  * TwoInputStreamOperators.
@@ -123,7 +122,6 @@ public class TwoInputStreamTaskTest {
 
 		testHarness.processElement(new Watermark(initialTime), 1, 0);
 
-
 		// now the output should still be empty
 		testHarness.waitForInputProcessing();
 		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
@@ -155,7 +153,6 @@ public class TwoInputStreamTaskTest {
 		testHarness.waitForInputProcessing();
 		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
 
-
 		// advance watermark from one of the inputs, now we should get a new one since the
 		// minimum increases
 		testHarness.processElement(new Watermark(initialTime + 4), 1, 1);
@@ -279,7 +276,6 @@ public class TwoInputStreamTaskTest {
 				expectedOutput,
 				testHarness.getOutput());
 
-
 		List<String> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
 		Assert.assertEquals(4, resultElements.size());
 	}
@@ -346,7 +342,6 @@ public class TwoInputStreamTaskTest {
 				expectedOutput,
 				testHarness.getOutput());
 
-
 		// Then give the earlier barrier, these should be ignored
 		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 0, 1);
 		testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forFullCheckpoint()), 1, 0);
@@ -354,7 +349,6 @@ public class TwoInputStreamTaskTest {
 
 		testHarness.waitForInputProcessing();
 
-
 		testHarness.endInput();
 
 		testHarness.waitForTaskCompletion();

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
index 7ce4ab7..9b9038f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -35,13 +36,11 @@ import java.util.List;
 /**
  * Test harness for testing a {@link TwoInputStreamTask}.
  *
- * <p>
- * This mock Invokable provides the task with a basic runtime context and allows pushing elements
+ * <p>This mock Invokable provides the task with a basic runtime context and allows pushing elements
  * and watermarks into the task. {@link #getOutput()} can be used to get the emitted elements
  * and events. You are free to modify the retrieved list.
  *
- * <p>
- * After setting up everything the Task can be invoked using {@link #invoke()}. This will start
+ * <p>After setting up everything the Task can be invoked using {@link #invoke()}. This will start
  * a new Thread to execute the Task. Use {@link #waitForTaskCompletion()} to wait for the Task
  * thread to finish. Use {@link #processElement}
  * to send elements to the task. Use
@@ -49,8 +48,7 @@ import java.util.List;
  * Before waiting for the task to finish you must call {@link #endInput()} to signal to the task
  * that data entry is finished.
  *
- * <p>
- * When Elements or Events are offered to the Task they are put into a queue. The input gates
+ * <p>When Elements or Events are offered to the Task they are put into a queue. The input gates
  * of the Task read from this queue. Use {@link #waitForInputProcessing()} to wait until all
  * queues are empty. This must be used after entering some elements before checking the
  * desired output.

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
index 77178d7..b14492e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
@@ -18,19 +18,23 @@
 
 package org.apache.flink.streaming.util;
 
-import com.fasterxml.jackson.databind.util.JSONPObject;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.util.serialization.AbstractDeserializationSchema;
 
+import com.fasterxml.jackson.databind.util.JSONPObject;
 import org.junit.Test;
 
 import java.io.IOException;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
+/**
+ * Tests for {@link AbstractDeserializationSchema}.
+ */
 @SuppressWarnings("serial")
 public class AbstractDeserializationSchemaTest {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 7a8488f..0a517f0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -15,12 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.util.OutputTag;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
@@ -48,9 +48,9 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperatorTest;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -64,7 +64,9 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.util.FutureUtil;
+import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.Preconditions;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -88,19 +90,19 @@ import static org.mockito.Mockito.when;
  */
 public class AbstractStreamOperatorTestHarness<OUT> {
 
-	final protected StreamOperator<OUT> operator;
+	protected final StreamOperator<OUT> operator;
 
-	final protected ConcurrentLinkedQueue<Object> outputList;
+	protected final ConcurrentLinkedQueue<Object> outputList;
 
-	final protected Map<OutputTag<?>, ConcurrentLinkedQueue<Object>> sideOutputLists;
+	protected final Map<OutputTag<?>, ConcurrentLinkedQueue<Object>> sideOutputLists;
 
-	final protected StreamConfig config;
+	protected final StreamConfig config;
 
-	final protected ExecutionConfig executionConfig;
+	protected final ExecutionConfig executionConfig;
 
-	final protected TestProcessingTimeService processingTimeService;
+	protected final TestProcessingTimeService processingTimeService;
 
-	final protected StreamTask<?, ?> mockTask;
+	protected final StreamTask<?, ?> mockTask;
 
 	final Environment environment;
 
@@ -291,16 +293,14 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 	}
 
 	/**
-	 * Calls
-	 * {@link StreamOperator#setup(StreamTask, StreamConfig, Output)} ()}
+	 * Calls {@link StreamOperator#setup(StreamTask, StreamConfig, Output)} ()}.
 	 */
 	public void setup() {
 		setup(null);
 	}
 
 	/**
-	 * Calls
-	 * {@link StreamOperator#setup(StreamTask, StreamConfig, Output)} ()}
+	 * Calls {@link StreamOperator#setup(StreamTask, StreamConfig, Output)} ()}.
 	 */
 	public void setup(TypeSerializer<OUT> outputSerializer) {
 		operator.setup(mockTask, config, new MockOutput(outputSerializer));
@@ -416,17 +416,14 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 	 * and repacks them into a single {@link OperatorStateHandles} so that the parallelism of the test
 	 * can change arbitrarily (i.e. be able to scale both up and down).
 	 *
-	 * <p>
-	 * After repacking the partial states, use {@link #initializeState(OperatorStateHandles)} to initialize
+	 * <p>After repacking the partial states, use {@link #initializeState(OperatorStateHandles)} to initialize
 	 * a new instance with the resulting state. Bear in mind that for parallelism greater than one, you
 	 * have to use the constructor {@link #AbstractStreamOperatorTestHarness(StreamOperator, int, int, int)}.
 	 *
-	 * <p>
-	 * <b>NOTE: </b> each of the {@code handles} in the argument list is assumed to be from a single task of a single
+	 * <p><b>NOTE: </b> each of the {@code handles} in the argument list is assumed to be from a single task of a single
 	 * operator (i.e. chain length of one).
 	 *
-	 * <p>
-	 * For an example of how to use it, have a look at
+	 * <p>For an example of how to use it, have a look at
 	 * {@link AbstractStreamOperatorTest#testStateAndTimerStateShufflingScalingDown()}.
 	 *
 	 * @param handles the different states to be merged.
@@ -540,7 +537,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 		CheckpointStreamFactory.CheckpointStateOutputStream outStream = stateBackend.createStreamFactory(
 				new JobID(),
 				"test_op").createCheckpointStateOutputStream(checkpointId, timestamp);
-		if(operator instanceof StreamCheckpointedOperator) {
+		if (operator instanceof StreamCheckpointedOperator) {
 			((StreamCheckpointedOperator) operator).snapshotState(outStream, checkpointId, timestamp);
 			return outStream.closeAndGetHandle();
 		} else {
@@ -549,7 +546,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 	}
 
 	/**
-	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#notifyOfCompletedCheckpoint(long)} ()}
+	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#notifyOfCompletedCheckpoint(long)} ()}.
 	 */
 	public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
 		operator.notifyOfCompletedCheckpoint(checkpointId);
@@ -562,7 +559,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 	@Deprecated
 	@SuppressWarnings("deprecation")
 	public void restore(StreamStateHandle snapshot) throws Exception {
-		if(operator instanceof StreamCheckpointedOperator) {
+		if (operator instanceof StreamCheckpointedOperator) {
 			try (FSDataInputStream in = snapshot.openInputStream()) {
 				((StreamCheckpointedOperator) operator).restoreState(in);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java
index fa68082..bd731b8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java
@@ -26,6 +26,9 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collection;
 
+/**
+ * Collecting {@link SourceFunction.SourceContext}.
+ */
 public class CollectingSourceContext<T extends Serializable> implements SourceFunction.SourceContext<T> {
 
 	private final Object lock;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
index bd929da..de84860 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
@@ -18,17 +18,20 @@
 
 package org.apache.flink.streaming.util;
 
-import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.OutputTag;
 
-import java.io.Serializable;
+import java.io.IOException;
 import java.util.List;
 
+/**
+ * Collecting {@link Output} for {@link StreamRecord}.
+ */
 public class CollectorOutput<T> implements Output<StreamRecord<T>> {
 
 	private final List<StreamElement> list;
@@ -49,8 +52,13 @@ public class CollectorOutput<T> implements Output<StreamRecord<T>> {
 
 	@Override
 	public void collect(StreamRecord<T> record) {
-		T copied = SerializationUtils.deserialize(SerializationUtils.serialize((Serializable) record.getValue()));
-		list.add(record.copy(copied));
+		try {
+			ClassLoader cl = record.getClass().getClassLoader();
+			T copied = InstantiationUtil.deserializeObject(InstantiationUtil.serializeObject(record.getValue()), cl);
+			list.add(record.copy(copied));
+		} catch (IOException | ClassNotFoundException ex) {
+			throw new RuntimeException("Unable to deserialize record: " + record, ex);
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java
index 1745c46..26da5d3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java
@@ -15,12 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 
 import java.util.Arrays;
 
+/**
+ * Tests for {@link OutputSelector}.
+ */
 public class EvenOddOutputSelector implements OutputSelector<Integer> {
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java
index f16750d..ca21c0c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java
@@ -15,10 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.OperatingSystem;
+
 import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Rule;
@@ -33,6 +35,9 @@ import java.io.FileOutputStream;
 
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for {@link HDFSCopyFromLocal} and {@link HDFSCopyToLocal}.
+ */
 public class HDFSCopyUtilitiesTest {
 
 	@Rule

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index c6d0bce..8f4908a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.JobID;
@@ -28,16 +29,16 @@ import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.util.Migration;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -84,7 +85,6 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 		setupMockTaskCreateKeyedBackend();
 	}
 
-
 	public KeyedOneInputStreamOperatorTestHarness(
 			OneInputStreamOperator<IN, OUT> operator,
 			final KeySelector<IN, K> keySelector,
@@ -148,7 +148,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 					timestamp,
 					streamFactory,
 					CheckpointOptions.forFullCheckpoint());
-			if(!keyedSnapshotRunnable.isDone()) {
+			if (!keyedSnapshotRunnable.isDone()) {
 				Thread runner = new Thread(keyedSnapshotRunnable);
 				runner.start();
 			}
@@ -181,7 +181,6 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 		}
 	}
 
-
 	private static boolean hasMigrationHandles(Collection<KeyedStateHandle> allKeyGroupsHandles) {
 		for (KeyedStateHandle handle : allKeyGroupsHandles) {
 			if (handle instanceof Migration) {

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
index 41a083a..10c79d0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.JobID;
@@ -24,11 +25,11 @@ import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -91,7 +92,7 @@ public class KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT>
 					final int numberOfKeyGroups = (Integer) invocationOnMock.getArguments()[1];
 					final KeyGroupRange keyGroupRange = (KeyGroupRange) invocationOnMock.getArguments()[2];
 
-					if(keyedStateBackend != null) {
+					if (keyedStateBackend != null) {
 						keyedStateBackend.close();
 					}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
index 5d73015..db4fe1c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -17,17 +17,20 @@
 
 package org.apache.flink.streaming.util;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Simple test context for stream operators.
+ */
 public class MockContext<IN, OUT> {
 
 	private List<OUT> outputs;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
index 8c3226b..f19946c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
@@ -17,16 +17,19 @@
 
 package org.apache.flink.streaming.util;
 
-import java.io.Serializable;
-import java.util.Collection;
-
-import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.OutputTag;
 
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Mock {@link Output} for {@link StreamRecord}.
+ */
 public class MockOutput<T> implements Output<StreamRecord<T>> {
 	private Collection<T> outputs;
 
@@ -36,8 +39,13 @@ public class MockOutput<T> implements Output<StreamRecord<T>> {
 
 	@Override
 	public void collect(StreamRecord<T> record) {
-		T copied = SerializationUtils.deserialize(SerializationUtils.serialize((Serializable) record.getValue()));
-		outputs.add(copied);
+		try {
+			ClassLoader cl = record.getClass().getClassLoader();
+			T copied = InstantiationUtil.deserializeObject(InstantiationUtil.serializeObject(record.getValue()), cl);
+			outputs.add(copied);
+		} catch (IOException | ClassNotFoundException ex) {
+			throw new RuntimeException("Unable to deserialize record: " + record, ex);
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpIntMap.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpIntMap.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpIntMap.java
index 05de3d0..c82ec7c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpIntMap.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/NoOpIntMap.java
@@ -15,10 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.functions.MapFunction;
 
+/**
+ * Identity mapper for {@code Integer}.
+ */
 public class NoOpIntMap implements MapFunction<Integer, Integer> {
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index ced8cca..652d016 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -86,7 +87,6 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
 		processElement(new StreamRecord<>(value, timestamp));
 	}
 
-
 	public void processElement(StreamRecord<IN> element) throws Exception {
 		operator.setKeyContextElement1(element);
 		oneInputOperator.processElement(element);

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
index 8011279..7e32723 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
@@ -15,8 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.util;
 
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.FileInputStream;
@@ -26,11 +33,6 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1Serializer;
-import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 
 /**
  * Util for writing/reading {@link org.apache.flink.streaming.runtime.tasks.OperatorStateHandles},
@@ -46,7 +48,7 @@ public class OperatorSnapshotUtil {
 
 	public static void writeStateHandle(OperatorStateHandles state, String path) throws IOException {
 		FileOutputStream out = new FileOutputStream(path);
-		
+
 		try (DataOutputStream dos = new DataOutputStream(out)) {
 
 			dos.writeInt(state.getOperatorChainIndex());

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
index 36ade8c..5f17467 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
@@ -17,11 +17,6 @@
 
 package org.apache.flink.streaming.util;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.RichFunction;
@@ -33,8 +28,17 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
-import static org.mockito.Mockito.*;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Utilities for {@link SourceFunction}.
+ */
 public class SourceFunctionUtil {
 
 	public static <T extends Serializable> List<T> runSourceFunction(SourceFunction<T> sourceFunction) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
index b46ea66..6489448 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
@@ -15,11 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.util;
 
-import com.google.common.collect.Iterables;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import com.google.common.collect.Iterables;
 import org.junit.Assert;
 
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
index 5fbe371..d0bbf8f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
@@ -25,8 +25,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 /**
  * A test harness for testing a {@link TwoInputStreamOperator}.
  *
- * <p>
- * This mock task provides the operator with a basic runtime context and allows pushing elements
+ * <p>This mock task provides the operator with a basic runtime context and allows pushing elements
  * and watermarks into the operator. {@link java.util.Deque}s containing the emitted elements
  * and watermarks can be retrieved. you are free to modify these.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
index a14d113..317f2e3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
@@ -31,8 +31,12 @@ import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
+/**
+ * Tests for {@link TypeInformationSerializationSchema}.
+ */
 public class TypeInformationSerializationSchemaTest {
 
 	@Test
@@ -82,7 +86,7 @@ public class TypeInformationSerializationSchemaTest {
 	//  Test data types
 	// ------------------------------------------------------------------------
 
-	public static class MyPOJO {
+	private static class MyPOJO {
 
 		public int aField;
 		public List<Date> aList;

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java
index 5e363e9..637c4ba 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java
@@ -22,10 +22,15 @@ import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
+
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
+/**
+ * Tests key selectors on arrays.
+ */
 public class ArrayKeySelectorTest {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java
index 74b1d18..6081ed1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java
@@ -19,12 +19,14 @@
 package org.apache.flink.streaming.util.serialization;
 
 import org.apache.flink.core.testutils.CommonTestUtils;
+
 import org.junit.Test;
 
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 
 /**
  * Tests for the {@link SimpleStringSchema}.

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/typeutils/FieldAccessorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/typeutils/FieldAccessorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/typeutils/FieldAccessorTest.java
index 5e7dd35..2fb7964 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/typeutils/FieldAccessorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/typeutils/FieldAccessorTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.streaming.util.typeutils;
 
-import static org.junit.Assert.*;
-
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
@@ -30,8 +28,14 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for field accessors.
+ */
 public class FieldAccessorTest {
 
 	// Note, that AggregationFunctionTest indirectly also tests FieldAccessors.
@@ -59,7 +63,6 @@ public class FieldAccessorTest {
 		assertEquals("b", f0.get(t));
 		assertEquals("b", t.f0);
 
-
 		FieldAccessor<Tuple2<String, Integer>, Integer> f1n = FieldAccessorFactory.getAccessor(tpeInfo, 1, null);
 		assertEquals(7, (int) f1n.get(t));
 		assertEquals(7, (int) t.f1);
@@ -81,11 +84,11 @@ public class FieldAccessorTest {
 		assertEquals("b", t.f0);
 
 		// This is technically valid (the ".0" is selecting the 0th field of a basic type).
-		FieldAccessor<Tuple2<String, Integer>, String> f0_0 = FieldAccessorFactory.getAccessor(tpeInfo, "f0.0", null);
-		assertEquals("b", f0_0.get(t));
+		FieldAccessor<Tuple2<String, Integer>, String> f0f0 = FieldAccessorFactory.getAccessor(tpeInfo, "f0.0", null);
+		assertEquals("b", f0f0.get(t));
 		assertEquals("b", t.f0);
-		t = f0_0.set(t, "cc");
-		assertEquals("cc", f0_0.get(t));
+		t = f0f0.set(t, "cc");
+		assertEquals("cc", f0f0.get(t));
 		assertEquals("cc", t.f0);
 
 	}
@@ -103,7 +106,7 @@ public class FieldAccessorTest {
 	public void testTupleInTuple() {
 		Tuple2<String, Tuple3<Integer, Long, Double>> t = Tuple2.of("aa", Tuple3.of(5, 9L, 2.0));
 		TupleTypeInfo<Tuple2<String, Tuple3<Integer, Long, Double>>> tpeInfo =
-				(TupleTypeInfo<Tuple2<String, Tuple3<Integer, Long, Double>>>)TypeExtractor.getForObject(t);
+				(TupleTypeInfo<Tuple2<String, Tuple3<Integer, Long, Double>>>) TypeExtractor.getForObject(t);
 
 		FieldAccessor<Tuple2<String, Tuple3<Integer, Long, Double>>, String> f0 = FieldAccessorFactory
 			.getAccessor(tpeInfo, "f0", null);
@@ -148,6 +151,9 @@ public class FieldAccessorTest {
 		FieldAccessorFactory.getAccessor(TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class), 2, null);
 	}
 
+	/**
+	 * POJO.
+	 */
 	public static class Foo {
 		public int x;
 		public Tuple2<String, Long> t;
@@ -203,6 +209,9 @@ public class FieldAccessorTest {
 		FieldAccessorFactory.getAccessor(tpeInfo, "illegal.illegal.illegal", null);
 	}
 
+	/**
+	 * POJO for testing field access.
+	 */
 	public static class Inner {
 		public long x;
 		public boolean b;
@@ -220,16 +229,19 @@ public class FieldAccessorTest {
 
 		@Override
 		public String toString() {
-			return ((Long)x).toString() + ", " + b;
+			return ((Long) x).toString() + ", " + b;
 		}
 	}
 
+	/**
+	 * POJO containing POJO.
+	 */
 	public static class Outer {
 		public int a;
 		public Inner i;
 		public short b;
 
-		public Outer(){}
+		public Outer() {}
 
 		public Outer(int a, Inner i, short b) {
 			this.a = a;
@@ -239,13 +251,13 @@ public class FieldAccessorTest {
 
 		@Override
 		public String toString() {
-			return a+", "+i.toString()+", "+b;
+			return a + ", " + i.toString() + ", " + b;
 		}
 	}
 
 	@Test
 	public void testPojoInPojo() {
-		Outer o = new Outer(10, new Inner(4L), (short)12);
+		Outer o = new Outer(10, new Inner(4L), (short) 12);
 		PojoTypeInfo<Outer> tpeInfo = (PojoTypeInfo<Outer>) TypeInformation.of(Outer.class);
 
 		FieldAccessor<Outer, Long> fix = FieldAccessorFactory.getAccessor(tpeInfo, "i.x", null);
@@ -268,21 +280,19 @@ public class FieldAccessorTest {
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testArray() {
-		int[] a = new int[]{3,5};
+		int[] a = new int[]{3, 5};
 		FieldAccessor<int[], Integer> fieldAccessor =
 				(FieldAccessor<int[], Integer>) (Object)
 						FieldAccessorFactory.getAccessor(PrimitiveArrayTypeInfo.getInfoFor(a.getClass()), 1, null);
 
 		assertEquals(Integer.class, fieldAccessor.getFieldType().getTypeClass());
 
-		assertEquals((Integer)a[1], fieldAccessor.get(a));
+		assertEquals((Integer) a[1], fieldAccessor.get(a));
 
 		a = fieldAccessor.set(a, 6);
-		assertEquals((Integer)a[1], fieldAccessor.get(a));
-
-
+		assertEquals((Integer) a[1], fieldAccessor.get(a));
 
-		Integer[] b = new Integer[]{3,5};
+		Integer[] b = new Integer[]{3, 5};
 		FieldAccessor<Integer[], Integer> fieldAccessor2 =
 				(FieldAccessor<Integer[], Integer>) (Object)
 						FieldAccessorFactory.getAccessor(BasicArrayTypeInfo.getInfoFor(b.getClass()), 1, null);
@@ -295,6 +305,9 @@ public class FieldAccessorTest {
 		assertEquals(b[1], fieldAccessor2.get(b));
 	}
 
+	/**
+	 * POJO with array.
+	 */
 	public static class ArrayInPojo {
 		public long x;
 		public int[] arr;
@@ -311,8 +324,8 @@ public class FieldAccessorTest {
 
 	@Test
 	public void testArrayInPojo() {
-		ArrayInPojo o = new ArrayInPojo(10L, new int[]{3,4,5}, 12);
-		PojoTypeInfo<ArrayInPojo> tpeInfo = (PojoTypeInfo<ArrayInPojo>)TypeInformation.of(ArrayInPojo.class);
+		ArrayInPojo o = new ArrayInPojo(10L, new int[]{3, 4, 5}, 12);
+		PojoTypeInfo<ArrayInPojo> tpeInfo = (PojoTypeInfo<ArrayInPojo>) TypeInformation.of(ArrayInPojo.class);
 
 		FieldAccessor<ArrayInPojo, Integer> fix = FieldAccessorFactory.getAccessor(tpeInfo, "arr.1", null);
 		assertEquals(4, (int) fix.get(o));

http://git-wip-us.apache.org/repos/asf/flink/blob/12b4185c/tools/maven/strict-checkstyle.xml
----------------------------------------------------------------------
diff --git a/tools/maven/strict-checkstyle.xml b/tools/maven/strict-checkstyle.xml
index 9c58917..0c11aa9 100644
--- a/tools/maven/strict-checkstyle.xml
+++ b/tools/maven/strict-checkstyle.xml
@@ -36,7 +36,7 @@ This file is based on the checkstyle file of Apache Beam.
   <!--</module>-->
 
   <module name="NewlineAtEndOfFile">
-    <!-- windows can use \n\r vs \n, so enforce the most used one ie UNIx style -->
+    <!-- windows can use \r\n vs \n, so enforce the most used one ie UNIx style -->
     <property name="lineSeparator" value="lf" />
   </module>
 
@@ -84,7 +84,7 @@ This file is based on the checkstyle file of Apache Beam.
   -->
 
   <module name="FileLength">
-    <property name="max" value="2500"/>
+    <property name="max" value="3000"/>
   </module>
 
   <!-- All Java AST specific tests live under TreeWalker module. -->
@@ -197,9 +197,11 @@ This file is based on the checkstyle file of Apache Beam.
     <module name="ImportOrder">
       <!-- Checks for out of order import statements. -->
       <property name="severity" value="error"/>
-      <!-- This ensures that static imports go first. -->
-      <property name="option" value="top"/>
+      <!-- Flink imports first, then other imports, then javax and java imports, then static imports. -->
+      <property name="groups" value="org.apache.flink,*,javax,java"/>
+      <property name="separated" value="true"/>
       <property name="sortStaticImportsAlphabetically" value="true"/>
+      <property name="option" value="bottom"/>
       <property name="tokens" value="STATIC_IMPORT, IMPORT"/>
       <message key="import.ordering"
                value="Import {0} appears after other imports that it should precede"/>
@@ -478,6 +480,18 @@ This file is based on the checkstyle file of Apache Beam.
 
     -->
 
+    <module name="EmptyLineSeparator">
+	  <!-- Checks for empty line separator between tokens. The only
+           excluded token is VARIABLE_DEF, allowing class fields to
+           be declared on consecutive lines.
+      -->
+	  <property name="allowMultipleEmptyLines" value="false"/>
+	  <property name="allowMultipleEmptyLinesInsideClassMembers" value="false"/>
+	  <property name="tokens" value="PACKAGE_DEF, IMPORT, CLASS_DEF,
+	    INTERFACE_DEF, ENUM_DEF, STATIC_INIT, INSTANCE_INIT, METHOD_DEF,
+	    CTOR_DEF"/>
+    </module>
+
     <module name="WhitespaceAround">
       <!-- Checks that various tokens are surrounded by whitespace.
            This includes most binary operators and keywords followed


Mime
View raw message