flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [15/23] flink git commit: [FLINK-6107] Enable trailing whitespace check in streaming checkstyle
Date Wed, 26 Apr 2017 10:07:37 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
index 912b448..2a6a723 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
@@ -60,19 +60,19 @@ import static org.mockito.Mockito.when;
 
 @SuppressWarnings("serial")
 public class StreamSourceOperatorTest {
-	
+
 	@Test
 	public void testEmitMaxWatermarkForFiniteSource() throws Exception {
 
 		// regular stream source operator
-		StreamSource<String, FiniteSource<String>> operator = 
+		StreamSource<String, FiniteSource<String>> operator =
 				new StreamSource<>(new FiniteSource<String>());
-		
+
 		final List<StreamElement> output = new ArrayList<>();
-		
+
 		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0);
 		operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<String>(output));
-		
+
 		assertEquals(1, output.size());
 		assertEquals(Watermark.MAX_WATERMARK, output.get(0));
 	}
@@ -92,23 +92,23 @@ public class StreamSourceOperatorTest {
 
 		// run and exit
 		operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<String>(output));
-		
+
 		assertTrue(output.isEmpty());
 	}
-	
+
 	@Test
 	public void testNoMaxWatermarkOnAsyncCancel() throws Exception {
 
 		final List<StreamElement> output = new ArrayList<>();
 		final Thread runner = Thread.currentThread();
-		
+
 		// regular stream source operator
 		final StreamSource<String, InfiniteSource<String>> operator =
 				new StreamSource<>(new InfiniteSource<String>());
 
-		
+
 		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0);
-		
+
 		// trigger an async cancel in a bit
 		new Thread("canceler") {
 			@Override
@@ -120,7 +120,7 @@ public class StreamSourceOperatorTest {
 				runner.interrupt();
 			}
 		}.start();
-		
+
 		// run and wait to be canceled
 		try {
 			operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<String>(output));
@@ -289,7 +289,7 @@ public class StreamSourceOperatorTest {
 
 		StreamConfig cfg = new StreamConfig(new Configuration());
 		cfg.setStateBackend(new MemoryStateBackend());
-		
+
 		cfg.setTimeCharacteristic(timeChar);
 
 		Environment env = new DummyEnvironment("MockTwoInputTask", 1, 0);
@@ -320,7 +320,7 @@ public class StreamSourceOperatorTest {
 	}
 
 	// ------------------------------------------------------------------------
-	
+
 	private static final class FiniteSource<T> implements SourceFunction<T>, StoppableFunction {
 
 		@Override
@@ -336,7 +336,7 @@ public class StreamSourceOperatorTest {
 	private static final class InfiniteSource<T> implements SourceFunction<T>, StoppableFunction {
 
 		private volatile boolean running = true;
-		
+
 		@Override
 		public void run(SourceContext<T> ctx) throws Exception {
 			while (running) {

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
index f41ec02..6772db4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -43,13 +43,13 @@ public class StreamTaskTimerTest {
 	@Test
 	public void testOpenCloseAndTimestamps() throws Exception {
 		final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>();
-		
+
 		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
 				mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
 		testHarness.setupOutputForSingletonOperatorChain();
 
 		StreamConfig streamConfig = testHarness.getStreamConfig();
-		
+
 		StreamMap<String, String> mapOperator = new StreamMap<>(new DummyMapFunction<String>());
 		streamConfig.setStreamOperator(mapOperator);
 
@@ -78,7 +78,7 @@ public class StreamTaskTimerTest {
 		assertEquals("Trigger timer thread did not properly shut down",
 				0, StreamTask.TRIGGER_THREAD_GROUP.activeCount());
 	}
-	
+
 	@Test
 	public void checkScheduledTimestampe() {
 		try {
@@ -141,11 +141,11 @@ public class StreamTaskTimerTest {
 	}
 
 	private static class ValidatingProcessingTimeCallback implements ProcessingTimeCallback {
-		
+
 		static int numInSequence;
-		
+
 		private final AtomicReference<Throwable> errorRef;
-		
+
 		private final long expectedTimestamp;
 		private final int expectedInSequence;
 
@@ -167,9 +167,9 @@ public class StreamTaskTimerTest {
 			}
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
-	
+
 	public static class DummyMapFunction<T> implements MapFunction<T, T> {
 		@Override
 		public T map(T value) {

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
index 9ddea8c..f129c20 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
@@ -31,11 +31,11 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import static org.junit.Assert.*;
 
 public class TimestampsAndPeriodicWatermarksOperatorTest {
-	
+
 	@Test
 	public void testTimestampsAndPeriodicWatermarksOperator() throws Exception {
-		
-		final TimestampsAndPeriodicWatermarksOperator<Long> operator = 
+
+		final TimestampsAndPeriodicWatermarksOperator<Long> operator =
 				new TimestampsAndPeriodicWatermarksOperator<Long>(new LongExtractor());
 
 		OneInputStreamOperatorTestHarness<Long, Long> testHarness =
@@ -46,20 +46,20 @@ public class TimestampsAndPeriodicWatermarksOperatorTest {
 		long currentTime = 0;
 
 		testHarness.open();
-		
+
 		testHarness.processElement(new StreamRecord<>(1L, 1));
 		testHarness.processElement(new StreamRecord<>(2L, 1));
 		testHarness.processWatermark(new Watermark(2)); // this watermark should be ignored
 		testHarness.processElement(new StreamRecord<>(3L, 3));
 		testHarness.processElement(new StreamRecord<>(4L, 3));
-		
+
 		// validate first part of the sequence. we poll elements until our
 		// watermark updates to "3", which must be the result of the "4" element.
 		{
 			ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
 			long nextElementValue = 1L;
 			long lastWatermark = -1L;
-			
+
 			while (lastWatermark < 3) {
 				if (output.size() > 0) {
 					Object next = output.poll();
@@ -67,7 +67,7 @@ public class TimestampsAndPeriodicWatermarksOperatorTest {
 					Tuple2<Long, Long> update = validateElement(next, nextElementValue, lastWatermark);
 					nextElementValue = update.f0;
 					lastWatermark = update.f1;
-					
+
 					// check the invariant
 					assertTrue(lastWatermark < nextElementValue);
 				} else {
@@ -75,7 +75,7 @@ public class TimestampsAndPeriodicWatermarksOperatorTest {
 					testHarness.setProcessingTime(currentTime);
 				}
 			}
-			
+
 			output.clear();
 		}
 
@@ -99,7 +99,7 @@ public class TimestampsAndPeriodicWatermarksOperatorTest {
 					Tuple2<Long, Long> update = validateElement(next, nextElementValue, lastWatermark);
 					nextElementValue = update.f0;
 					lastWatermark = update.f1;
-					
+
 					// check the invariant
 					assertTrue(lastWatermark < nextElementValue);
 				} else {
@@ -110,7 +110,7 @@ public class TimestampsAndPeriodicWatermarksOperatorTest {
 
 			output.clear();
 		}
-		
+
 		testHarness.processWatermark(new Watermark(Long.MAX_VALUE));
 		assertEquals(Long.MAX_VALUE, ((Watermark) testHarness.getOutput().poll()).getTimestamp());
 	}
@@ -131,20 +131,20 @@ public class TimestampsAndPeriodicWatermarksOperatorTest {
 		testHarness.open();
 
 		long[] values = { Long.MIN_VALUE, -1L, 0L, 1L, 2L, 3L, Long.MAX_VALUE };
-		
+
 		for (long value : values) {
 			testHarness.processElement(new StreamRecord<>(value));
 		}
 
 		ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
-		
+
 		for (long value: values) {
 			assertEquals(value, ((StreamRecord<?>) output.poll()).getTimestamp());
 		}
 	}
 
 	// ------------------------------------------------------------------------
-	
+
 	private Tuple2<Long, Long> validateElement(Object element, long nextElementValue, long currentWatermark) {
 		if (element instanceof StreamRecord) {
 			@SuppressWarnings("unchecked")
@@ -162,7 +162,7 @@ public class TimestampsAndPeriodicWatermarksOperatorTest {
 			throw new IllegalArgumentException("unrecognized element: " + element);
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 
 	private static class LongExtractor implements AssignerWithPeriodicWatermarks<Long> {

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java
index 07199ac..0333e93 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java
@@ -35,15 +35,15 @@ public class TimestampsAndPunctuatedWatermarksOperatorTest {
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testTimestampsAndPeriodicWatermarksOperator() throws Exception {
-		
-		final TimestampsAndPunctuatedWatermarksOperator<Tuple2<Long, Boolean>> operator = 
+
+		final TimestampsAndPunctuatedWatermarksOperator<Tuple2<Long, Boolean>> operator =
 				new TimestampsAndPunctuatedWatermarksOperator<>(new PunctuatedExtractor());
-		
+
 		OneInputStreamOperatorTestHarness<Tuple2<Long, Boolean>, Tuple2<Long, Boolean>> testHarness =
 				new OneInputStreamOperatorTestHarness<>(operator);
 
 		testHarness.open();
-		
+
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>(3L, true), 0L));
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>(5L, false), 0L));
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>(4L, false), 0L));
@@ -58,10 +58,10 @@ public class TimestampsAndPunctuatedWatermarksOperatorTest {
 		testHarness.processWatermark(new Watermark(Long.MAX_VALUE));
 
 		ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
-		
+
 		assertEquals(3L, ((StreamRecord<Tuple2<Long, Boolean>>) output.poll()).getTimestamp());
 		assertEquals(3L, ((Watermark) output.poll()).getTimestamp());
-		
+
 		assertEquals(5L, ((StreamRecord<Tuple2<Long, Boolean>>) output.poll()).getTimestamp());
 		assertEquals(4L, ((StreamRecord<Tuple2<Long, Boolean>>) output.poll()).getTimestamp());
 		assertEquals(4L, ((StreamRecord<Tuple2<Long, Boolean>>) output.poll()).getTimestamp());
@@ -102,12 +102,12 @@ public class TimestampsAndPunctuatedWatermarksOperatorTest {
 			assertEquals(value, ((StreamRecord<?>) output.poll()).getTimestamp());
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 
 	private static class PunctuatedExtractor implements AssignerWithPunctuatedWatermarks<Tuple2<Long, Boolean>> {
 		private static final long serialVersionUID = 1L;
-		
+
 		@Override
 		public long extractTimestamp(Tuple2<Long, Boolean> element, long previousTimestamp) {
 			return element.f0;

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 6f0e881..a8d3154 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -80,14 +80,14 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 	@SuppressWarnings("unchecked")
 	private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class);
-	
+
 	private final KeySelector<Integer, Integer> identitySelector = new KeySelector<Integer, Integer>() {
 		@Override
 		public Integer getKey(Integer value) {
 			return value;
 		}
 	};
-	
+
 	private final InternalIterableWindowFunction<Integer, Integer, Integer, TimeWindow> validatingIdentityFunction =
 			new InternalIterableWindowFunction<>(new WindowFunction<Integer, Integer, Integer, TimeWindow>() {
 				@Override
@@ -117,7 +117,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		ClosureCleaner.clean(validatingIdentityFunction, false);
 		ClosureCleaner.clean(validatingIdentityProcessFunction, false);
 	}
-	
+
 	// ------------------------------------------------------------------------
 
 	@After
@@ -134,9 +134,9 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		assertTrue("Not all trigger threads where properly shut down",
 				StreamTask.TRIGGER_THREAD_GROUP.activeCount() == 0);
 	}
-	
+
 	// ------------------------------------------------------------------------
-	
+
 	@Test
 	public void testInvalidParameters() {
 		try {
@@ -144,7 +144,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			assertInvalidParameter(10000L, -1L);
 			assertInvalidParameter(-1L, 1000L);
 			assertInvalidParameter(1000L, 2000L);
-			
+
 			// actual internal slide is too low here:
 			assertInvalidParameter(1000L, 999L);
 		}
@@ -153,12 +153,12 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testWindowSizeAndSlide() {
 		try {
 			AccumulatingProcessingTimeWindowOperator<String, String, String> op;
-			
+
 			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
 					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000, 1000);
 			assertEquals(5000, op.getWindowSize());
@@ -568,7 +568,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			List<Integer> result = extractFromStreamRecords(testHarness.extractOutputStreamRecords());
 
 			assertEquals(6, result.size());
-			
+
 			Collections.sort(result);
 			assertEquals(Arrays.asList(1, 1, 1, 2, 2, 2), result);
 
@@ -898,7 +898,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			// inject some elements
 			final int numElements = 1000;
 			final int numElementsFirst = 700;
-			
+
 			for (int i = 0; i < numElementsFirst; i++) {
 				testHarness.processElement(new StreamRecord<>(i));
 			}
@@ -964,13 +964,13 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testKeyValueStateInWindowFunction() {
 		try {
 
 			StatefulFunction.globalCounts.clear();
-			
+
 			// tumbling window that triggers every 20 milliseconds
 			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
 					new AccumulatingProcessingTimeWindowOperator<>(
@@ -1008,7 +1008,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 			assertEquals(4, StatefulFunction.globalCounts.get(1).intValue());
 			assertEquals(4, StatefulFunction.globalCounts.get(2).intValue());
-			
+
 			testHarness.close();
 			op.dispose();
 		}
@@ -1017,13 +1017,13 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
-	
+
 	private void assertInvalidParameter(long windowSize, long windowSlide) {
 		try {
 			new AccumulatingProcessingTimeWindowOperator<String, String, String>(
-					mockFunction, mockKeySelector, 
+					mockFunction, mockKeySelector,
 					StringSerializer.INSTANCE, StringSerializer.INSTANCE,
 					windowSize, windowSlide);
 			fail("This should fail with an IllegalArgumentException");
@@ -1064,7 +1064,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 				// the checks may fail
 				state.update(state.value() + 1);
 				globalCounts.put(key, state.value());
-				
+
 				out.collect(i);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index 34eac9e..468f14c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -553,13 +553,13 @@ public class AllWindowTranslationTest {
 				.windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.aggregate(new DummyAggregationFunction());
 
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = 
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
 				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
 
 		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
 
 		Assert.assertTrue(operator instanceof WindowOperator);
-		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = 
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
 				(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
 
 		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
@@ -581,13 +581,13 @@ public class AllWindowTranslationTest {
 				.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.aggregate(new DummyAggregationFunction());
 
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = 
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
 				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
 
 		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
 
 		Assert.assertTrue(operator instanceof WindowOperator);
-		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = 
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
 				(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
 
 		Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
@@ -615,7 +615,7 @@ public class AllWindowTranslationTest {
 		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
 
 		Assert.assertTrue(operator instanceof WindowOperator);
-		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = 
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
 				(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
 
 		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
@@ -643,7 +643,7 @@ public class AllWindowTranslationTest {
 		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
 
 		Assert.assertTrue(operator instanceof WindowOperator);
-		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = 
+		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
 				(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
 
 		Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
@@ -1507,7 +1507,7 @@ public class AllWindowTranslationTest {
 		@Override
 		public void apply(
 				TimeWindow window,
-				Iterable<Tuple2<String, Integer>> values, 
+				Iterable<Tuple2<String, Integer>> values,
 				Collector<Tuple3<String, String, Integer>> out) throws Exception {
 
 			for (Tuple2<String, Integer> in : values) {

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutIfAbsentTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutIfAbsentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutIfAbsentTest.java
index c0b20a3..8786c4e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutIfAbsentTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutIfAbsentTest.java
@@ -25,15 +25,15 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class KeyMapPutIfAbsentTest {
-	
+
 	@Test
 	public void testPutIfAbsentUniqueKeysAndGrowth() {
 		try {
 			KeyMap<Integer, Integer> map = new KeyMap<>();
 			IntegerFactory factory = new IntegerFactory();
-			
+
 			final int numElements = 1000000;
-			
+
 			for (int i = 0; i < numElements; i++) {
 				factory.set(2 * i + 1);
 				map.putIfAbsent(i, factory);
@@ -43,7 +43,7 @@ public class KeyMapPutIfAbsentTest {
 				assertTrue(map.getCurrentTableCapacity() > map.getRehashThreshold());
 				assertTrue(map.size() <= map.getRehashThreshold());
 			}
-			
+
 			assertEquals(numElements, map.size());
 			assertEquals(numElements, map.traverseAndCountElements());
 			assertEquals(1 << 21, map.getCurrentTableCapacity());
@@ -51,7 +51,7 @@ public class KeyMapPutIfAbsentTest {
 			for (int i = 0; i < numElements; i++) {
 				assertEquals(2 * i + 1, map.get(i).intValue());
 			}
-			
+
 			for (int i = numElements - 1; i >= 0; i--) {
 				assertEquals(2 * i + 1, map.get(i).intValue());
 			}
@@ -66,13 +66,13 @@ public class KeyMapPutIfAbsentTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testPutIfAbsentDuplicateKeysAndGrowth() {
 		try {
 			KeyMap<Integer, Integer> map = new KeyMap<>();
 			IntegerFactory factory = new IntegerFactory();
-			
+
 			final int numElements = 1000000;
 
 			for (int i = 0; i < numElements; i++) {
@@ -102,13 +102,13 @@ public class KeyMapPutIfAbsentTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
-	
+
 	private static class IntegerFactory implements KeyMap.LazyFactory<Integer> {
-		
+
 		private Integer toCreate;
-		
+
 		public void set(Integer toCreate) {
 			this.toCreate = toCreate;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutTest.java
index 09c44fe..5b59bea 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapPutTest.java
@@ -63,7 +63,7 @@ public class KeyMapPutTest {
 			int numContained = 0;
 			for (KeyMap.Entry<Integer, Integer> entry : map) {
 				numContained++;
-				
+
 				assertEquals(entry.getKey() * 2 + 1, entry.getValue().intValue());
 				assertFalse(bitset.get(entry.getKey()));
 				bitset.set(entry.getKey());
@@ -71,8 +71,8 @@ public class KeyMapPutTest {
 
 			assertEquals(numElements, numContained);
 			assertEquals(numElements, bitset.cardinality());
-			
-			
+
+
 			assertEquals(numElements, map.size());
 			assertEquals(numElements, map.traverseAndCountElements());
 			assertEquals(1 << 21, map.getCurrentTableCapacity());
@@ -105,13 +105,13 @@ public class KeyMapPutTest {
 				int expected = (i % 3 == 0) ? (2*i) : (2*i+1);
 				assertEquals(expected, map.get(i).intValue());
 			}
-			
+
 			assertEquals(numElements, map.size());
 			assertEquals(numElements, map.traverseAndCountElements());
 			assertEquals(1 << 21, map.getCurrentTableCapacity());
 			assertTrue(map.getLongestChainLength() <= 7);
 
-			
+
 			BitSet bitset = new BitSet();
 			int numContained = 0;
 			for (KeyMap.Entry<Integer, Integer> entry : map) {

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapTest.java
index 49310df..c7fb108 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMapTest.java
@@ -27,7 +27,7 @@ import java.util.Random;
 import static org.junit.Assert.*;
 
 public class KeyMapTest {
-	
+
 	@Test
 	public void testInitialSizeComputation() {
 		try {
@@ -38,7 +38,7 @@ public class KeyMapTest {
 			assertEquals(6, map.getLog2TableCapacity());
 			assertEquals(24, map.getShift());
 			assertEquals(48, map.getRehashThreshold());
-			
+
 			map = new KeyMap<>(0);
 			assertEquals(64, map.getCurrentTableCapacity());
 			assertEquals(6, map.getLog2TableCapacity());
@@ -80,7 +80,7 @@ public class KeyMapTest {
 			assertEquals(7, map.getLog2TableCapacity());
 			assertEquals(23, map.getShift());
 			assertEquals(96, map.getRehashThreshold());
-			
+
 			// no negative number of elements
 			try {
 				new KeyMap<>(-1);
@@ -89,7 +89,7 @@ public class KeyMapTest {
 			catch (IllegalArgumentException e) {
 				// expected
 			}
-			
+
 			// check integer overflow
 			try {
 				map = new KeyMap<>(0x65715522);
@@ -115,17 +115,17 @@ public class KeyMapTest {
 		try {
 			final KeyMap<Integer, Integer> map = new KeyMap<>();
 			final Random rnd = new Random();
-			
+
 			final long seed = rnd.nextLong();
 			final int numElements = 10000;
-			
+
 			final HashMap<Integer, Integer> groundTruth = new HashMap<>();
-			
+
 			rnd.setSeed(seed);
 			for (int i = 0; i < numElements; i++) {
 				Integer key = rnd.nextInt();
 				Integer value = rnd.nextInt();
-				
+
 				if (rnd.nextBoolean()) {
 					groundTruth.put(key, value);
 					map.put(key, value);
@@ -139,7 +139,7 @@ public class KeyMapTest {
 				// skip these, evaluating it is tricky due to duplicates
 				rnd.nextInt();
 				rnd.nextBoolean();
-				
+
 				Integer expected = groundTruth.get(key);
 				if (expected == null) {
 					assertNull(map.get(key));
@@ -156,12 +156,12 @@ public class KeyMapTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testConjunctTraversal() {
 		try {
 			final Random rootRnd = new Random(654685486325439L);
-			
+
 			final int numMaps = 7;
 			final int numKeys = 1000000;
 
@@ -171,7 +171,7 @@ public class KeyMapTest {
 			for (int i = 0; i < numMaps; i++) {
 				maps[i] = new KeyMap<>();
 			}
-			
+
 			// ------ prepare probabilities for maps ------
 			final double[] probabilities = new double[numMaps];
 			final double[] probabilitiesTemp = new double[numMaps];
@@ -186,39 +186,39 @@ public class KeyMapTest {
 				// compensate for rounding errors
 				probabilities[numMaps - 1] = remainingProb;
 			}
-			
+
 			// ------ generate random elements ------
 			final long probSeed = rootRnd.nextLong();
 			final long keySeed = rootRnd.nextLong();
-			
+
 			final Random probRnd = new Random(probSeed);
 			final Random keyRnd = new Random(keySeed);
-			
+
 			final int maxStride = Integer.MAX_VALUE / numKeys;
-			
+
 			int totalNumElements = 0;
 			int nextKeyValue = 1;
-			
+
 			for (int i = 0; i < numKeys; i++) {
 				int numCopies = (nextKeyValue % 3) + 1;
 				System.arraycopy(probabilities, 0, probabilitiesTemp, 0, numMaps);
-				
+
 				double totalProb = 1.0;
 				for (int copy = 0; copy < numCopies; copy++) {
 					int pos = drawPosProportionally(probabilitiesTemp, totalProb, probRnd);
 					totalProb -= probabilitiesTemp[pos];
 					probabilitiesTemp[pos] = 0.0;
-					
+
 					Integer boxed = nextKeyValue;
 					Integer previous = maps[pos].put(boxed, boxed);
 					assertNull("Test problem - test does not assign unique maps", previous);
 				}
-				
+
 				totalNumElements += numCopies;
 				nextKeyValue += keyRnd.nextInt(maxStride) + 1;
 			}
-			
-			
+
+
 			// check that all maps contain the total number of elements
 			int numContained = 0;
 			for (KeyMap<?, ?> map : maps) {
@@ -228,13 +228,13 @@ public class KeyMapTest {
 
 			// ------ check that all elements can be found in the maps ------
 			keyRnd.setSeed(keySeed);
-			
+
 			numContained = 0;
 			nextKeyValue = 1;
 			for (int i = 0; i < numKeys; i++) {
 				int numCopiesExpected = (nextKeyValue % 3) + 1;
 				int numCopiesContained = 0;
-				
+
 				for (KeyMap<Integer, Integer> map : maps) {
 					Integer val = map.get(nextKeyValue);
 					if (val != null) {
@@ -242,10 +242,10 @@ public class KeyMapTest {
 						numCopiesContained++;
 					}
 				}
-				
+
 				assertEquals(numCopiesExpected, numCopiesContained);
 				numContained += numCopiesContained;
-				
+
 				nextKeyValue += keyRnd.nextInt(maxStride) + 1;
 			}
 			assertEquals(totalNumElements, numContained);
@@ -256,12 +256,12 @@ public class KeyMapTest {
 
 				private int key;
 				private int valueCount;
-				
+
 				@Override
 				public void startNewKey(Integer key) {
 					this.key = key;
 					this.valueCount = 0;
-					
+
 					keysStartedAndFinished[0]++;
 				}
 
@@ -277,13 +277,13 @@ public class KeyMapTest {
 					if (expected != valueCount) {
 						fail("Wrong count for key " + key + " ; expected=" + expected + " , count=" + valueCount);
 					}
-					
+
 					keysStartedAndFinished[1]++;
 				}
 			};
-			
+
 			KeyMap.traverseMaps(shuffleArray(maps, rootRnd), traversal, 17);
-			
+
 			assertEquals(numKeys, keysStartedAndFinished[0]);
 			assertEquals(numKeys, keysStartedAndFinished[1]);
 		}
@@ -298,9 +298,9 @@ public class KeyMapTest {
 		try {
 			KeyMap<String, String> map1 = new KeyMap<>(5);
 			KeyMap<String, String> map2 = new KeyMap<>(80);
-			
+
 			assertTrue(map1.getCurrentTableCapacity() < map2.getCurrentTableCapacity());
-			
+
 			assertTrue(KeyMap.CapacityDescendingComparator.INSTANCE.compare(map1, map1) == 0);
 			assertTrue(KeyMap.CapacityDescendingComparator.INSTANCE.compare(map2, map2) == 0);
 			assertTrue(KeyMap.CapacityDescendingComparator.INSTANCE.compare(map1, map2) > 0);
@@ -311,12 +311,12 @@ public class KeyMapTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 
 	private static int drawPosProportionally(double[] array, double totalProbability, Random rnd) {
 		double val = rnd.nextDouble() * totalProbability;
-		
+
 		double accum = 0;
 		for (int i = 0; i < array.length; i++) {
 			accum += array[i];
@@ -324,21 +324,21 @@ public class KeyMapTest {
 				return i;
 			}
 		}
-		
+
 		// in case of rounding errors
 		return array.length - 1;
 	}
-	
+
 	private static <E> E[] shuffleArray(E[] array, Random rnd) {
 		E[] target = Arrays.copyOf(array, array.length);
-		
+
 		for (int i = target.length - 1; i > 0; i--) {
 			int swapPos = rnd.nextInt(i + 1);
 			E temp = target[i];
 			target[i] = target[swapPos];
 			target[swapPos] = temp;
 		}
-		
+
 		return target;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 8f8667b..1137c6a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -686,7 +686,7 @@ public class WindowOperatorTest extends TestLogger {
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-		
+
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
 		testHarness.open();
@@ -862,7 +862,7 @@ public class WindowOperatorTest extends TestLogger {
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-		
+
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
 		testHarness.open();
@@ -1296,9 +1296,9 @@ public class WindowOperatorTest extends TestLogger {
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-		
+
 		testHarness.open();
-		
+
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
 		ConcurrentLinkedQueue<Object> lateExpected = new ConcurrentLinkedQueue<>();
 
@@ -1366,7 +1366,7 @@ public class WindowOperatorTest extends TestLogger {
 
 		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-		
+
 		testHarness.open();
 
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
@@ -1500,7 +1500,7 @@ public class WindowOperatorTest extends TestLogger {
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		testHarness.open();
-		
+
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
 		ConcurrentLinkedQueue<Object> sideExpected = new ConcurrentLinkedQueue<>();
 
@@ -1579,7 +1579,7 @@ public class WindowOperatorTest extends TestLogger {
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		testHarness.open();
-		
+
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
 		ConcurrentLinkedQueue<Object> sideExpected = new ConcurrentLinkedQueue<>();
 
@@ -1671,7 +1671,7 @@ public class WindowOperatorTest extends TestLogger {
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		testHarness.open();
-		
+
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
 		ConcurrentLinkedQueue<Object> sideExpected = new ConcurrentLinkedQueue<>();
 
@@ -1848,7 +1848,7 @@ public class WindowOperatorTest extends TestLogger {
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		testHarness.open();
-		
+
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
 
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
@@ -1949,7 +1949,7 @@ public class WindowOperatorTest extends TestLogger {
 			new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
 
 		testHarness.open();
-		
+
 		ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
 
 		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
index a1cea13..ddfb9e7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
@@ -30,7 +30,7 @@ public class BroadcastPartitionerTest {
 	private BroadcastPartitioner<Tuple> broadcastPartitioner1;
 	private BroadcastPartitioner<Tuple> broadcastPartitioner2;
 	private BroadcastPartitioner<Tuple> broadcastPartitioner3;
-	
+
 	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(null);
 	private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(null);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitionerTest.java
index aa70e8a..06a1acd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitionerTest.java
@@ -26,17 +26,17 @@ import org.junit.Before;
 import org.junit.Test;
 
 public class RebalancePartitionerTest {
-	
+
 	private RebalancePartitioner<Tuple> distributePartitioner;
 	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(null);
 	private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
 			null);
-	
+
 	@Before
 	public void setPartitioner() {
 		distributePartitioner = new RebalancePartitioner<Tuple>();
 	}
-	
+
 	@Test
 	public void testSelectChannelsLength() {
 		sd.setInstance(streamRecord);
@@ -44,7 +44,7 @@ public class RebalancePartitionerTest {
 		assertEquals(1, distributePartitioner.selectChannels(sd, 2).length);
 		assertEquals(1, distributePartitioner.selectChannels(sd, 1024).length);
 	}
-	
+
 	@Test
 	public void testSelectChannelsInterval() {
 		sd.setInstance(streamRecord);

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
index d72c37b..7102f27 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java
@@ -59,17 +59,17 @@ import static org.junit.Assert.*;
 
 @SuppressWarnings("serial")
 public class RescalePartitionerTest extends TestLogger {
-	
+
 	private RescalePartitioner<Tuple> distributePartitioner;
 	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(null);
 	private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
 			null);
-	
+
 	@Before
 	public void setPartitioner() {
 		distributePartitioner = new RescalePartitioner<Tuple>();
 	}
-	
+
 	@Test
 	public void testSelectChannelsLength() {
 		sd.setInstance(streamRecord);
@@ -77,7 +77,7 @@ public class RescalePartitionerTest extends TestLogger {
 		assertEquals(1, distributePartitioner.selectChannels(sd, 2).length);
 		assertEquals(1, distributePartitioner.selectChannels(sd, 1024).length);
 	}
-	
+
 	@Test
 	public void testSelectChannelsInterval() {
 		sd.setInstance(streamRecord);

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java
index 0f42a65..2012c94 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java
@@ -37,20 +37,20 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class StreamElementSerializerTest {
-	
+
 	@Test
 	public void testDeepDuplication() {
 		@SuppressWarnings("unchecked")
 		TypeSerializer<Long> serializer1 = (TypeSerializer<Long>) mock(TypeSerializer.class);
-		
+
 		@SuppressWarnings("unchecked")
 		TypeSerializer<Long> serializer2 = (TypeSerializer<Long>) mock(TypeSerializer.class);
-		
+
 		when(serializer1.duplicate()).thenReturn(serializer2);
-		
+
 		StreamElementSerializer<Long> streamRecSer =
 				new StreamElementSerializer<Long>(serializer1);
-		
+
 		assertEquals(serializer1, streamRecSer.getContainedTypeSerializer());
 
 		StreamElementSerializer<Long> copy = streamRecSer.duplicate();
@@ -62,12 +62,12 @@ public class StreamElementSerializerTest {
 	public void testBasicProperties() {
 		StreamElementSerializer<Long> streamRecSer =
 				new StreamElementSerializer<Long>(LongSerializer.INSTANCE);
-		
+
 		assertFalse(streamRecSer.isImmutableType());
 		assertEquals(Long.class, streamRecSer.createInstance().getValue().getClass());
 		assertEquals(-1L, streamRecSer.getLength());
 	}
-	
+
 	@Test
 	public void testSerialization() throws Exception {
 		final StreamElementSerializer<String> serializer =
@@ -88,20 +88,20 @@ public class StreamElementSerializerTest {
 		Watermark negativeWatermark = new Watermark(-4647654567676555876L);
 		assertEquals(negativeWatermark, serializeAndDeserialize(negativeWatermark, serializer));
 	}
-	
+
 	@SuppressWarnings("unchecked")
 	private static <T, X extends StreamElement> X serializeAndDeserialize(
 			X record,
 			StreamElementSerializer<T> serializer) throws IOException {
-		
+
 		DataOutputSerializer output = new DataOutputSerializer(32);
 		serializer.serialize(record, output);
-		
+
 		// additional binary copy step
 		DataInputDeserializer copyInput = new DataInputDeserializer(output.getByteArray(), 0, output.length());
 		DataOutputSerializer copyOutput = new DataOutputSerializer(32);
 		serializer.copy(copyInput, copyOutput);
-		
+
 		DataInputDeserializer input = new DataInputDeserializer(copyOutput.getByteArray(), 0, copyOutput.length());
 		return (X) serializer.deserialize(input);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordTest.java
index 5d34b74..08d9644 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordTest.java
@@ -23,17 +23,17 @@ import org.junit.Test;
 import static org.junit.Assert.*;
 
 public class StreamRecordTest {
-	
+
 	@Test
 	public void testWithNoTimestamp() {
 		StreamRecord<String> record = new StreamRecord<>("test");
-		
+
 		assertTrue(record.isRecord());
 		assertFalse(record.isWatermark());
-		
+
 		assertFalse(record.hasTimestamp());
 		assertEquals("test", record.getValue());
-		
+
 //		try {
 //			record.getTimestamp();
 //			fail("should throw an exception");
@@ -42,13 +42,13 @@ public class StreamRecordTest {
 //		}
 		// for now, the "no timestamp case" returns Long.MIN_VALUE
 		assertEquals(Long.MIN_VALUE, record.getTimestamp());
-		
+
 		assertNotNull(record.toString());
 		assertTrue(record.hashCode() == new StreamRecord<>("test").hashCode());
 		assertTrue(record.equals(new StreamRecord<>("test")));
-		
+
 		assertEquals(record, record.asRecord());
-		
+
 		try {
 			record.asWatermark();
 			fail("should throw an exception");
@@ -66,11 +66,11 @@ public class StreamRecordTest {
 
 		assertTrue(record.hasTimestamp());
 		assertEquals(42L, record.getTimestamp());
-		
+
 		assertEquals("foo", record.getValue());
-		
+
 		assertNotNull(record.toString());
-		
+
 		assertTrue(record.hashCode() == new StreamRecord<>("foo", 42).hashCode());
 		assertTrue(record.hashCode() != new StreamRecord<>("foo").hashCode());
 
@@ -95,7 +95,7 @@ public class StreamRecordTest {
 		assertEquals(Long.MIN_VALUE, new StreamRecord<>("test", Long.MIN_VALUE).getTimestamp());
 		assertEquals(Long.MAX_VALUE, new StreamRecord<>("test", Long.MAX_VALUE).getTimestamp());
 	}
-	
+
 	@Test
 	public void testReplacePreservesTimestamp() {
 		StreamRecord<String> recNoTimestamp = new StreamRecord<>("o sole mio");
@@ -104,7 +104,7 @@ public class StreamRecordTest {
 
 		StreamRecord<String> recWithTimestamp = new StreamRecord<>("la dolce vita", 99);
 		StreamRecord<Integer> newRecWithTimestamp = recWithTimestamp.replace(17);
-		
+
 		assertTrue(newRecWithTimestamp.hasTimestamp());
 		assertEquals(99L, newRecWithTimestamp.getTimestamp());
 	}
@@ -113,12 +113,12 @@ public class StreamRecordTest {
 	public void testReplaceWithTimestampOverridesTimestamp() {
 		StreamRecord<String> record = new StreamRecord<>("la divina comedia");
 		assertFalse(record.hasTimestamp());
-		
+
 		StreamRecord<Double> newRecord = record.replace(3.14, 123);
 		assertTrue(newRecord.hasTimestamp());
 		assertEquals(123L, newRecord.getTimestamp());
 	}
-	
+
 	@Test
 	public void testCopy() {
 		StreamRecord<String> recNoTimestamp = new StreamRecord<String>("test");
@@ -142,16 +142,16 @@ public class StreamRecordTest {
 		recWithTimestamp.copyTo("test", recWithTimestampCopy);
 		assertEquals(recWithTimestamp, recWithTimestampCopy);
 	}
-	
+
 	@Test
 	public void testSetAndEraseTimestamps() {
 		StreamRecord<String> rec = new StreamRecord<String>("hello");
 		assertFalse(rec.hasTimestamp());
-		
+
 		rec.setTimestamp(13456L);
 		assertTrue(rec.hasTimestamp());
 		assertEquals(13456L, rec.getTimestamp());
-		
+
 		rec.eraseTimestamp();
 		assertFalse(rec.hasTimestamp());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
index a21e58b..19e27f2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
@@ -64,7 +64,7 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends StreamTaskTestHarnes
 		int numInputChannelsPerGate,
 		TypeInformation<IN> inputType,
 		TypeInformation<OUT> outputType) {
-		
+
 		super(task, outputType);
 
 		this.inputType = inputType;

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java
index 3bd7adf..c5fd682 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java
@@ -52,11 +52,11 @@ public class SourceStreamTaskStoppingTest {
 
 		final StoppableSourceStreamTask<Object, StoppableFailingSource> sourceTask = new StoppableSourceStreamTask<>();
 		sourceTask.stop();
-		
+
 		sourceTask.headOperator = new StoppableStreamSource<>(new StoppableFailingSource());
 		sourceTask.run();
 	}
-	
+
 	// ------------------------------------------------------------------------
 
 	private static class StoppableSource extends RichSourceFunction<Object> implements StoppableFunction {

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
index 53f77ca..f8d5393 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -71,7 +71,7 @@ public class StreamTaskCancellationBarrierTest {
 	/**
 	 * This test verifies (for onw input tasks) that the Stream tasks react the following way to
 	 * receiving a checkpoint cancellation barrier:
-	 * 
+	 *
 	 *   - send a "decline checkpoint" notification out (to the JobManager)
 	 *   - emit a cancellation barrier downstream
 	 */
@@ -167,7 +167,7 @@ public class StreamTaskCancellationBarrierTest {
 
 		private final Object lock = new Object();
 		private volatile boolean running = true;
-		
+
 		@Override
 		protected void init() throws Exception {
 			synchronized (lock) {
@@ -203,7 +203,7 @@ public class StreamTaskCancellationBarrierTest {
 
 	private static class UnionCoMap implements CoMapFunction<String, String, String> {
 		private static final long serialVersionUID = 1L;
-		
+
 		@Override
 		public String map1(String value) throws Exception {
 			return value;

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index f34522b..546188e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -760,7 +760,7 @@ public class StreamTaskTest extends TestLogger {
 
 		LibraryCacheManager libCache = mock(LibraryCacheManager.class);
 		when(libCache.getClassLoader(any(JobID.class))).thenReturn(StreamTaskTest.class.getClassLoader());
-		
+
 		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 		ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class);
 		PartitionProducerStateChecker partitionProducerStateChecker = mock(PartitionProducerStateChecker.class);
@@ -814,16 +814,16 @@ public class StreamTaskTest extends TestLogger {
 			partitionProducerStateChecker,
 			executor);
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Test operators
 	// ------------------------------------------------------------------------
-	
+
 	public static class SlowlyDeserializingOperator extends StreamSource<Long, SourceFunction<Long>> {
 		private static final long serialVersionUID = 1L;
 
 		private volatile boolean canceled = false;
-		
+
 		public SlowlyDeserializingOperator() {
 			super(new MockSourceFunction());
 		}
@@ -847,7 +847,7 @@ public class StreamTaskTest extends TestLogger {
 		// slow deserialization
 		private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
 			in.defaultReadObject();
-			
+
 			long delay = 500;
 			long deadline = System.currentTimeMillis() + delay;
 			do {
@@ -857,7 +857,7 @@ public class StreamTaskTest extends TestLogger {
 			} while ((delay = deadline - System.currentTimeMillis()) > 0);
 		}
 	}
-	
+
 	private static class MockSourceFunction implements SourceFunction<Long> {
 		private static final long serialVersionUID = 1L;
 
@@ -888,7 +888,7 @@ public class StreamTaskTest extends TestLogger {
 							return Mockito.mock(OperatorStateBackend.class);
 						}
 					});
-	
+
 				Mockito.when(stateBackendMock.createKeyedStateBackend(
 						Mockito.any(Environment.class),
 						Mockito.any(JobID.class),
@@ -955,7 +955,7 @@ public class StreamTaskTest extends TestLogger {
 	}
 
 	/**
-	 * A task that locks if cancellation attempts to cleanly shut down 
+	 * A task that locks if cancellation attempts to cleanly shut down
 	 */
 	public static class CancelLockingTask extends StreamTask<String, AbstractStreamOperator<String>> {
 
@@ -995,11 +995,11 @@ public class StreamTaskTest extends TestLogger {
 			// do not interrupt the lock holder here, to simulate spawned threads that
 			// we cannot properly interrupt on cancellation
 		}
-		
+
 	}
 
 	/**
-	 * A task that locks if cancellation attempts to cleanly shut down 
+	 * A task that locks if cancellation attempts to cleanly shut down
 	 */
 	public static class CancelFailingTask extends StreamTask<String, AbstractStreamOperator<String>> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
index fb4f087..1f8638e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
@@ -312,7 +312,7 @@ public class SystemProcessingTimeServiceTest extends TestLogger {
 			latch.await();
 			timer.quiesceAndAwaitPending();
 
-			// should be able to immediately acquire the lock, since the task must have exited by now 
+			// should be able to immediately acquire the lock, since the task must have exited by now
 			assertTrue(scopeLock.tryLock());
 
 			// should be able to schedule more tasks (that never get executed)
@@ -327,7 +327,7 @@ public class SystemProcessingTimeServiceTest extends TestLogger {
 			// nothing should be scheduled right now
 			assertEquals(0, timer.getNumTasksScheduled());
 
-			// check that no asynchronous error was reported - that ensures that the newly scheduled 
+			// check that no asynchronous error was reported - that ensures that the newly scheduled
 			// triggerable did, in fact, not trigger
 			if (errorRef.get() != null) {
 				throw new Exception(errorRef.get());
@@ -397,7 +397,7 @@ public class SystemProcessingTimeServiceTest extends TestLogger {
 						latch.trigger();
 					}
 				}, lock);
-		
+
 		timeServiceProvider.registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback() {
 			@Override
 			public void onProcessingTime(long timestamp) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
index 220c1cd..1ae144c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
@@ -41,7 +41,7 @@ public class AbstractDeserializationSchemaTest {
 		TypeInformation<Tuple2<byte[], byte[]>> expected = TypeInformation.of(new TypeHint<Tuple2<byte[], byte[]>>(){});
 		assertEquals(expected, type);
 	}
-	
+
 	@Test
 	public void testTypeExtractionTupleAnonymous() {
 		TypeInformation<Tuple2<byte[], byte[]>> type = new AbstractDeserializationSchema<Tuple2<byte[], byte[]>>() {
@@ -50,7 +50,7 @@ public class AbstractDeserializationSchemaTest {
 				throw new UnsupportedOperationException();
 			}
 		}.getProducedType();
-		
+
 		TypeInformation<Tuple2<byte[], byte[]>> expected = TypeInformation.of(new TypeHint<Tuple2<byte[], byte[]>>(){});
 		assertEquals(expected, type);
 	}
@@ -84,7 +84,7 @@ public class AbstractDeserializationSchemaTest {
 			// expected
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Test types
 	// ------------------------------------------------------------------------
@@ -96,7 +96,7 @@ public class AbstractDeserializationSchemaTest {
 			throw new UnsupportedOperationException();
 		}
 	}
-	
+
 	private static class JsonSchema extends AbstractDeserializationSchema<JSONPObject> {
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java
index d9ad24d..fa68082 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectingSourceContext.java
@@ -27,7 +27,7 @@ import java.io.Serializable;
 import java.util.Collection;
 
 public class CollectingSourceContext<T extends Serializable> implements SourceFunction.SourceContext<T> {
-	
+
 	private final Object lock;
 	private final Collection<T> collection;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
index c2763d8..5d73015 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -29,7 +29,7 @@ import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 public class MockContext<IN, OUT> {
-	
+
 	private List<OUT> outputs;
 
 	private MockOutput<OUT> output;
@@ -54,7 +54,7 @@ public class MockContext<IN, OUT> {
 	public static <IN, OUT> List<OUT> createAndExecute(OneInputStreamOperator<IN, OUT> operator, List<IN> inputs) throws Exception {
 		return createAndExecuteForKeyedStream(operator, inputs, null, null);
 	}
-	
+
 	public static <IN, OUT, KEY> List<OUT> createAndExecuteForKeyedStream(
 				OneInputStreamOperator<IN, OUT> operator, List<IN> inputs,
 				KeySelector<IN, KEY> keySelector, TypeInformation<KEY> keyType) throws Exception {
@@ -64,7 +64,7 @@ public class MockContext<IN, OUT> {
 
 		testHarness.setup();
 		testHarness.open();
-		
+
 		operator.open();
 
 		for (IN in: inputs) {

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
index ea03675..36ade8c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
@@ -39,17 +39,17 @@ public class SourceFunctionUtil {
 
 	public static <T extends Serializable> List<T> runSourceFunction(SourceFunction<T> sourceFunction) throws Exception {
 		final List<T> outputs = new ArrayList<T>();
-		
+
 		if (sourceFunction instanceof RichFunction) {
 
 			AbstractStreamOperator<?> operator = mock(AbstractStreamOperator.class);
 			when(operator.getExecutionConfig()).thenReturn(new ExecutionConfig());
-			
+
 			RuntimeContext runtimeContext =  new StreamingRuntimeContext(
 					operator,
 					new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
 					new HashMap<String, Accumulator<?, ?>>());
-			
+
 			((RichFunction) sourceFunction).setRuntimeContext(runtimeContext);
 
 			((RichFunction) sourceFunction).open(new Configuration());

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
index 7d791c0..5fbe371 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
@@ -37,7 +37,7 @@ public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT>extends AbstractStr
 	public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator) throws Exception {
 		this(operator, 1, 1, 0);
 	}
-		
+
 	public TwoInputStreamOperatorTestHarness(
 			TwoInputStreamOperator<IN1, IN2, OUT> operator,
 			int maxParallelism,

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
index e722f53..a14d113 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
@@ -34,22 +34,22 @@ import java.util.List;
 import static org.junit.Assert.*;
 
 public class TypeInformationSerializationSchemaTest {
-	
+
 	@Test
 	public void testDeSerialization() {
 		try {
 			TypeInformation<MyPOJO> info = TypeExtractor.getForClass(MyPOJO.class);
-			
+
 			TypeInformationSerializationSchema<MyPOJO> schema =
 					new TypeInformationSerializationSchema<MyPOJO>(info, new ExecutionConfig());
-			
+
 			MyPOJO[] types = {
 					new MyPOJO(72, new Date(763784523L), new Date(88234L)),
 					new MyPOJO(-1, new Date(11111111111111L)),
 					new MyPOJO(42),
 					new MyPOJO(17, new Date(222763784523L))
 			};
-			
+
 			for (MyPOJO val : types) {
 				byte[] serialized = schema.serialize(val);
 				MyPOJO deser = schema.deserialize(serialized);
@@ -61,7 +61,7 @@ public class TypeInformationSerializationSchemaTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testSerializability() {
 		try {
@@ -77,16 +77,16 @@ public class TypeInformationSerializationSchemaTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Test data types
 	// ------------------------------------------------------------------------
-	
+
 	public static class MyPOJO {
-		
+
 		public int aField;
 		public List<Date> aList;
-		
+
 		public MyPOJO() {}
 
 		public MyPOJO(int iVal, Date... dates) {
@@ -103,7 +103,7 @@ public class TypeInformationSerializationSchemaTest {
 		public boolean equals(Object obj) {
 			if (obj instanceof MyPOJO) {
 				MyPOJO that = (MyPOJO) obj;
-				return this.aField == that.aField && (this.aList == null ? 
+				return this.aField == that.aField && (this.aList == null ?
 						that.aList == null :
 						that.aList != null && this.aList.equals(that.aList));
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java
index 63375a7..5e363e9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java
@@ -33,19 +33,19 @@ public class ArrayKeySelectorTest {
 		try {
 			String[] array1 = { "a", "b", "c", "d", "e" };
 			String[] array2 = { "v", "w", "x", "y", "z" };
-			
+
 			KeySelectorUtil.ArrayKeySelector<String[]> singleFieldSelector =
 					KeySelectorUtil.getSelectorForArray(new int[] {1}, BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO);
-			
+
 			assertEquals(new Tuple1<>("b"), singleFieldSelector.getKey(array1));
 			assertEquals(new Tuple1<>("w"), singleFieldSelector.getKey(array2));
 
 			KeySelectorUtil.ArrayKeySelector<String[]> twoFieldsSelector =
 					KeySelectorUtil.getSelectorForArray(new int[] {3, 0}, BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO);
-			
+
 			assertEquals(new Tuple2<>("d", "a"), twoFieldsSelector.getKey(array1));
 			assertEquals(new Tuple2<>("y", "v"), twoFieldsSelector.getKey(array2));
-			
+
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -59,7 +59,7 @@ public class ArrayKeySelectorTest {
 			int[] array1 = { 1, 2, 3, 4, 5 };
 			int[] array2 = { -5, -4, -3, -2, -1, 0 };
 
-			KeySelectorUtil.ArrayKeySelector<int[]> singleFieldSelector = 
+			KeySelectorUtil.ArrayKeySelector<int[]> singleFieldSelector =
 					KeySelectorUtil.getSelectorForArray(new int[] {1}, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
 
 			assertEquals(new Tuple1<>(2), singleFieldSelector.getKey(array1));
@@ -67,7 +67,7 @@ public class ArrayKeySelectorTest {
 
 			KeySelectorUtil.ArrayKeySelector<int[]> twoFieldsSelector =
 					KeySelectorUtil.getSelectorForArray(new int[] {3, 0}, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
-			
+
 			assertEquals(new Tuple2<>(4, 1), twoFieldsSelector.getKey(array1));
 			assertEquals(new Tuple2<>(-2, -5), twoFieldsSelector.getKey(array2));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/tools/maven/strict-checkstyle.xml
----------------------------------------------------------------------
diff --git a/tools/maven/strict-checkstyle.xml b/tools/maven/strict-checkstyle.xml
index 7684a24..3285e5c 100644
--- a/tools/maven/strict-checkstyle.xml
+++ b/tools/maven/strict-checkstyle.xml
@@ -47,11 +47,11 @@ This file is based on the checkstyle file of Apache Beam.
     <property name="severity" value="error" />
   </module>
 
-  <!--<module name="RegexpSingleline">-->
-    <!--<property name="format" value="\s+$"/>-->
-    <!--<property name="message" value="Trailing whitespace"/>-->
-    <!--<property name="severity" value="error"/>-->
-  <!--</module>-->
+  <module name="RegexpSingleline">
+    <property name="format" value="\s+$"/>
+    <property name="message" value="Trailing whitespace"/>
+    <property name="severity" value="error"/>
+  </module>
 
   <module name="RegexpSingleline">
     <property name="format" value="Throwables.propagate\("/>


Mime
View raw message