flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [17/23] flink git commit: [FLINK-6107] Enable trailing whitespace check in streaming checkstyle
Date Wed, 26 Apr 2017 10:07:39 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index cc0bd64..70d76d4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -47,38 +47,38 @@ import static java.util.Objects.requireNonNull;
  */
 @Internal
 @Deprecated
-public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, STATE, F extends Function> 
-		extends AbstractUdfStreamOperator<OUT, F> 
+public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, STATE, F extends Function>
+		extends AbstractUdfStreamOperator<OUT, F>
 		implements OneInputStreamOperator<IN, OUT>, ProcessingTimeCallback {
-	
+
 	private static final long serialVersionUID = 3245500864882459867L;
-	
+
 	private static final long MIN_SLIDE_TIME = 50;
-	
+
 	// ----- fields for operator parametrization -----
-	
+
 	private final Function function;
 	private final KeySelector<IN, KEY> keySelector;
-	
+
 	private final TypeSerializer<KEY> keySerializer;
 	private final TypeSerializer<STATE> stateTypeSerializer;
-	
+
 	private final long windowSize;
 	private final long windowSlide;
 	private final long paneSize;
 	private final int numPanesPerWindow;
-	
+
 	// ----- fields for operator functionality -----
-	
+
 	private transient AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes;
-	
+
 	private transient TimestampedCollector<OUT> out;
-	
+
 	private transient RestoredState<IN, KEY, STATE, OUT> restoredState;
-	
+
 	private transient long nextEvaluationTime;
 	private transient long nextSlideTime;
-	
+
 	protected AbstractAlignedProcessingTimeWindowOperator(
 			F function,
 			KeySelector<IN, KEY> keySelector,
@@ -88,7 +88,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 			long windowSlide)
 	{
 		super(function);
-		
+
 		if (windowLength < MIN_SLIDE_TIME) {
 			throw new IllegalArgumentException("Window length must be at least " + MIN_SLIDE_TIME + " msecs");
 		}
@@ -98,14 +98,14 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 		if (windowLength < windowSlide) {
 			throw new IllegalArgumentException("The window size must be larger than the window slide");
 		}
-		
+
 		final long paneSlide = ArithmeticUtils.gcd(windowLength, windowSlide);
 		if (paneSlide < MIN_SLIDE_TIME) {
 			throw new IllegalArgumentException(String.format(
 					"Cannot compute window of size %d msecs sliding by %d msecs. " +
 							"The unit of grouping is too small: %d msecs", windowLength, windowSlide, paneSlide));
 		}
-		
+
 		this.function = requireNonNull(function);
 		this.keySelector = requireNonNull(keySelector);
 		this.keySerializer = requireNonNull(keySerializer);
@@ -115,8 +115,8 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 		this.paneSize = paneSlide;
 		this.numPanesPerWindow = MathUtils.checkedDownCast(windowLength / paneSlide);
 	}
-	
-	
+
+
 	protected abstract AbstractKeyedTimePanes<IN, KEY, STATE, OUT> createPanes(
 			KeySelector<IN, KEY> keySelector, Function function);
 
@@ -129,7 +129,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 		super.open();
 
 		out = new TimestampedCollector<>(output);
-		
+
 		// decide when to first compute the window and when to slide it
 		// the values should align with the start of time (that is, the UNIX epoch, not the big bang)
 		final long now = getProcessingTimeService().getCurrentProcessingTime();
@@ -137,7 +137,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 		nextSlideTime = now + paneSize - (now % paneSize);
 
 		final long firstTriggerTime = Math.min(nextEvaluationTime, nextSlideTime);
-		
+
 		// check if we restored state and if we need to fire some windows based on that restored state
 		if (restoredState == null) {
 			// initial empty state: create empty panes that gather the elements per slide
@@ -146,12 +146,12 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 		else {
 			// restored state
 			panes = restoredState.panes;
-			
+
 			long nextPastEvaluationTime = restoredState.nextEvaluationTime;
 			long nextPastSlideTime = restoredState.nextSlideTime;
 			long nextPastTriggerTime = Math.min(nextPastEvaluationTime, nextPastSlideTime);
 			int numPanesRestored = panes.getNumPanes();
-			
+
 			// fire windows from the past as long as there are more panes with data and as long
 			// as the missed trigger times have not caught up with the presence
 			while (numPanesRestored > 0 && nextPastTriggerTime < firstTriggerTime) {
@@ -160,7 +160,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 					computeWindow(nextPastTriggerTime);
 					nextPastEvaluationTime += windowSlide;
 				}
-				
+
 				// evaluate slide from the past
 				if (nextPastTriggerTime == nextPastSlideTime) {
 					panes.slidePanes(numPanesPerWindow);
@@ -179,7 +179,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 	@Override
 	public void close() throws Exception {
 		super.close();
-		
+
 		// early stop the triggering thread, so it does not attempt to return any more data
 		stopTriggers();
 	}
@@ -187,7 +187,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 	@Override
 	public void dispose() throws Exception {
 		super.dispose();
-		
+
 		// acquire the lock during shutdown, to prevent trigger calls at the same time
 		// fail-safe stop of the triggering thread (in case of an error)
 		stopTriggers();
@@ -198,7 +198,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 			panes.dispose();
 		}
 	}
-	
+
 	private void stopTriggers() {
 		// reset the action timestamps. this makes sure any pending triggers will not evaluate
 		nextEvaluationTime = -1L;
@@ -208,7 +208,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 	// ------------------------------------------------------------------------
 	//  Receiving elements and triggers
 	// ------------------------------------------------------------------------
-	
+
 	@Override
 	public void processElement(StreamRecord<IN> element) throws Exception {
 		panes.addElementToLatestPane(element.getValue());
@@ -234,7 +234,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 		long nextTriggerTime = Math.min(nextEvaluationTime, nextSlideTime);
 		getProcessingTimeService().registerTimer(nextTriggerTime, this);
 	}
-	
+
 	private void computeWindow(long timestamp) throws Exception {
 		out.setAbsoluteTimestamp(timestamp);
 		panes.truncatePanes(numPanesPerWindow);
@@ -293,7 +293,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 	public long getPaneSize() {
 		return paneSize;
 	}
-	
+
 	public int getNumPanesPerWindow() {
 		return numPanesPerWindow;
 	}
@@ -309,7 +309,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
-	
+
 	@Override
 	public String toString() {
 		return "Window (processing time) (length=" + windowSize + ", slide=" + windowSlide + ')';
@@ -317,7 +317,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 
 	// ------------------------------------------------------------------------
 	// ------------------------------------------------------------------------
-	
+
 	private static final class RestoredState<IN, KEY, STATE, OUT> {
 
 		final AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes;

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
index 037b4d7..76f2a97 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
@@ -35,11 +35,11 @@ import java.util.Iterator;
  */
 @Internal
 public abstract class AbstractKeyedTimePanes<Type, Key, Aggregate, Result> {
-	
+
 	private static final int BEGIN_OF_STATE_MAGIC_NUMBER = 0x0FF1CE42;
 
 	private static final int BEGIN_OF_PANE_MAGIC_NUMBER = 0xBADF00D5;
-	
+
 	/** The latest time pane. */
 	protected KeyMap<Key, Aggregate> latestPane = new KeyMap<>();
 
@@ -51,19 +51,19 @@ public abstract class AbstractKeyedTimePanes<Type, Key, Aggregate, Result> {
 	public abstract void addElementToLatestPane(Type element) throws Exception;
 
 	public abstract void evaluateWindow(Collector<Result> out, TimeWindow window, AbstractStreamOperator<Result> operator) throws Exception;
-	
-	
+
+
 	public void dispose() {
 		// since all is heap data, there is no need to clean up anything
 		latestPane = null;
 		previousPanes.clear();
 	}
-	
+
 	public int getNumPanes() {
 		return previousPanes.size() + 1;
 	}
-	
-	
+
+
 	public void slidePanes(int panesToKeep) {
 		if (panesToKeep > 1) {
 			// the current pane becomes the latest previous pane
@@ -78,13 +78,13 @@ public abstract class AbstractKeyedTimePanes<Type, Key, Aggregate, Result> {
 		// we need a new latest pane
 		latestPane = new KeyMap<>();
 	}
-	
+
 	public void truncatePanes(int numToRetain) {
 		while (previousPanes.size() >= numToRetain) {
 			previousPanes.removeFirst();
 		}
 	}
-	
+
 	protected void traverseAllPanes(KeyMap.TraversalEvaluator<Key, Aggregate> traversal, long traversalPass) throws Exception{
 		// gather all panes in an array (faster iterations)
 		@SuppressWarnings({"unchecked", "rawtypes"})
@@ -94,7 +94,7 @@ public abstract class AbstractKeyedTimePanes<Type, Key, Aggregate, Result> {
 		// let the maps make a coordinated traversal and evaluate the window function per contained key
 		KeyMap.traverseMaps(panes, traversal, traversalPass);
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Serialization and de-serialization
 	// ------------------------------------------------------------------------
@@ -105,16 +105,16 @@ public abstract class AbstractKeyedTimePanes<Type, Key, Aggregate, Result> {
 			final TypeSerializer<Aggregate> aggSerializer) throws IOException
 	{
 		output.writeInt(BEGIN_OF_STATE_MAGIC_NUMBER);
-		
+
 		int numPanes = getNumPanes();
 		output.writeInt(numPanes);
-		
+
 		// write from the past
 		Iterator<KeyMap<Key, Aggregate>> previous = previousPanes.iterator();
 		for (int paneNum = 0; paneNum < numPanes; paneNum++) {
 			output.writeInt(BEGIN_OF_PANE_MAGIC_NUMBER);
 			KeyMap<Key, Aggregate> pane = (paneNum == numPanes - 1) ? latestPane : previous.next();
-			
+
 			output.writeInt(pane.size());
 			for (KeyMap.Entry<Key, Aggregate> entry : pane) {
 				keySerializer.serialize(entry.getKey(), output);
@@ -122,7 +122,7 @@ public abstract class AbstractKeyedTimePanes<Type, Key, Aggregate, Result> {
 			}
 		}
 	}
-	
+
 	public void readFromInput(
 			final DataInputView input,
 			final TypeSerializer<Key> keySerializer,
@@ -130,26 +130,26 @@ public abstract class AbstractKeyedTimePanes<Type, Key, Aggregate, Result> {
 	{
 		validateMagicNumber(BEGIN_OF_STATE_MAGIC_NUMBER, input.readInt());
 		int numPanes = input.readInt();
-		
+
 		// read from the past towards the presence
 		while (numPanes > 0) {
 			validateMagicNumber(BEGIN_OF_PANE_MAGIC_NUMBER, input.readInt());
 			KeyMap<Key, Aggregate> pane = (numPanes == 1) ? latestPane : new KeyMap<Key, Aggregate>();
-			
+
 			final int numElementsInPane = input.readInt();
 			for (int i = numElementsInPane - 1; i >= 0; i--) {
 				Key k = keySerializer.deserialize(input);
 				Aggregate a = aggSerializer.deserialize(input);
 				pane.put(k, a);
 			}
-			
+
 			if (numPanes > 1) {
 				previousPanes.addLast(pane);
 			}
 			numPanes--;
 		}
 	}
-	
+
 	private static void validateMagicNumber(int expected, int found) throws IOException {
 		if (expected != found) {
 			throw new IOException("Corrupt state stream - wrong magic number. " +

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
index 954dc10..5d95cdc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
@@ -45,7 +45,7 @@ import java.util.ArrayList;
  */
 @Internal
 public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyedTimePanes<Type, Key, ArrayList<Type>, Result> {
-	
+
 	private final KeySelector<Type, Key> keySelector;
 
 	private final KeyMap.LazyFactory<ArrayList<Type>> listFactory = getListFactory();
@@ -58,10 +58,10 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 	 * IMPORTANT: This value needs to start at one, so it is fresher than the value that new entries
 	 * have (zero).
 	 */
-	private long evaluationPass = 1L;   
+	private long evaluationPass = 1L;
 
 	// ------------------------------------------------------------------------
-	
+
 	public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, InternalWindowFunction<Iterable<Type>, Result, Key, Window> function) {
 		this.keySelector = keySelector;
 		this.function = function;
@@ -97,28 +97,28 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 					function, window, out, operator, context);
 			traverseAllPanes(evaluator, evaluationPass);
 		}
-		
+
 		evaluationPass++;
 	}
 
 	// ------------------------------------------------------------------------
 	//  Running a window function in a map traversal
 	// ------------------------------------------------------------------------
-	
+
 	static final class WindowFunctionTraversal<Key, Type, Result> implements KeyMap.TraversalEvaluator<Key, ArrayList<Type>> {
 
 		private final InternalWindowFunction<Iterable<Type>, Result, Key, Window> function;
-		
+
 		private final UnionIterator<Type> unionIterator;
-		
+
 		private final Collector<Result> out;
 
 		private final TimeWindow window;
-		
+
 		private final AbstractStreamOperator<Result> contextOperator;
 
 		private Key currentKey;
-		
+
 		private AccumulatingKeyedTimePanesContext context;
 
 		WindowFunctionTraversal(InternalWindowFunction<Iterable<Type>, Result, Key, Window> function, TimeWindow window,
@@ -150,11 +150,11 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 			function.process(currentKey, window, context, unionIterator, out);
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Lazy factory for lists (put if absent)
 	// ------------------------------------------------------------------------
-	
+
 	@SuppressWarnings("unchecked")
 	private static <V> KeyMap.LazyFactory<ArrayList<V>> getListFactory() {
 		return (KeyMap.LazyFactory<ArrayList<V>>) LIST_FACTORY;

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
index 5072374..d71f80e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
@@ -43,7 +43,7 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
 
 	private static final long serialVersionUID = 7305948082830843475L;
 
-	
+
 	public AccumulatingProcessingTimeWindowOperator(
 			InternalWindowFunction<Iterable<IN>, OUT, KEY, TimeWindow> function,
 			KeySelector<IN, KEY> keySelector,
@@ -60,7 +60,7 @@ public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT>
 	protected AccumulatingKeyedTimePanes<IN, KEY, OUT> createPanes(KeySelector<IN, KEY> keySelector, Function function) {
 		@SuppressWarnings("unchecked")
 		InternalWindowFunction<Iterable<IN>, OUT, KEY, Window> windowFunction = (InternalWindowFunction<Iterable<IN>, OUT, KEY, Window>) function;
-		
+
 		return new AccumulatingKeyedTimePanes<>(keySelector, windowFunction);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
index d26ab9d..66d41f1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
@@ -30,9 +30,9 @@ import org.apache.flink.util.Collector;
  */
 @Internal
 public class AggregatingKeyedTimePanes<Type, Key> extends AbstractKeyedTimePanes<Type, Key, Type, Type> {
-	
+
 	private final KeySelector<Type, Key> keySelector;
-	
+
 	private final ReduceFunction<Type> reducer;
 
 	/**
@@ -42,7 +42,7 @@ public class AggregatingKeyedTimePanes<Type, Key> extends AbstractKeyedTimePanes
 	private long evaluationPass = 1L;
 
 	// ------------------------------------------------------------------------
-	
+
 	public AggregatingKeyedTimePanes(KeySelector<Type, Key> keySelector, ReduceFunction<Type> reducer) {
 		this.keySelector = keySelector;
 		this.reducer = reducer;
@@ -57,7 +57,7 @@ public class AggregatingKeyedTimePanes<Type, Key> extends AbstractKeyedTimePanes
 	}
 
 	@Override
-	public void evaluateWindow(Collector<Type> out, TimeWindow window, 
+	public void evaluateWindow(Collector<Type> out, TimeWindow window,
 								AbstractStreamOperator<Type> operator) throws Exception {
 		if (previousPanes.isEmpty()) {
 			// optimized path for single pane case
@@ -70,22 +70,22 @@ public class AggregatingKeyedTimePanes<Type, Key> extends AbstractKeyedTimePanes
 			AggregatingTraversal<Key, Type> evaluator = new AggregatingTraversal<>(reducer, out, operator);
 			traverseAllPanes(evaluator, evaluationPass);
 		}
-		
+
 		evaluationPass++;
 	}
 
 	// ------------------------------------------------------------------------
 	//  The maps traversal that performs the final aggregation
 	// ------------------------------------------------------------------------
-	
+
 	static final class AggregatingTraversal<Key, Type> implements KeyMap.TraversalEvaluator<Key, Type> {
 
 		private final ReduceFunction<Type> function;
-		
+
 		private final Collector<Type> out;
-		
+
 		private final AbstractStreamOperator<Type> operator;
-		
+
 		private Type currentValue;
 
 		AggregatingTraversal(ReduceFunction<Type> function, Collector<Type> out,

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
index 51803e6..b83815c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
@@ -33,12 +33,12 @@ import org.apache.flink.api.java.functions.KeySelector;
  */
 @Internal
 @Deprecated
-public class AggregatingProcessingTimeWindowOperator<KEY, IN> 
+public class AggregatingProcessingTimeWindowOperator<KEY, IN>
 		extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, IN, IN, ReduceFunction<IN>> {
 
 	private static final long serialVersionUID = 7305948082830843475L;
 
-	
+
 	public AggregatingProcessingTimeWindowOperator(
 			ReduceFunction<IN> function,
 			KeySelector<IN, KEY> keySelector,
@@ -54,7 +54,7 @@ public class AggregatingProcessingTimeWindowOperator<KEY, IN>
 	protected AggregatingKeyedTimePanes<IN, KEY> createPanes(KeySelector<IN, KEY> keySelector, Function function) {
 		@SuppressWarnings("unchecked")
 		ReduceFunction<IN> windowFunction = (ReduceFunction<IN>) function;
-		
+
 		return new AggregatingKeyedTimePanes<IN, KEY>(keySelector, windowFunction);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java
index 4e50d82..d251ddd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/KeyMap.java
@@ -44,33 +44,33 @@ import java.util.NoSuchElementException;
  */
 @Internal
 public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> {
-	
+
 	/** The minimum table capacity, 64 entries. */
 	private static final int MIN_CAPACITY = 0x40;
-	
+
 	/**
 	 * The maximum possible table capacity, the largest positive power of two in the 32bit signed
 	 * integer value range.
 	 */
 	private static final int MAX_CAPACITY = 0x40000000;
-	
+
 	/** The number of bits used for table addressing when table is at max capacity. */
 	private static final int FULL_BIT_RANGE = MathUtils.log2strict(MAX_CAPACITY);
-	
+
 	// ------------------------------------------------------------------------
-	
+
 	/** The hash index, as an array of entries. */
 	private Entry<K, V>[] table;
-	
+
 	/** The number of bits by which the hash code is shifted right, to find the bucket. */
 	private int shift;
-	
+
 	/** The number of elements in the hash table. */
 	private int numElements;
-	
+
 	/** The number of elements above which the hash table needs to grow. */
 	private int rehashThreshold;
-	
+
 	/** The base-2 logarithm of the table capacity. */
 	private int log2size;
 
@@ -85,14 +85,14 @@ public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> {
 
 	/**
 	 * Creates a new table with a capacity tailored to the given expected number of elements.
-	 * 
+	 *
 	 * @param expectedNumberOfElements The number of elements to tailor the capacity to.
 	 */
 	public KeyMap(int expectedNumberOfElements) {
 		if (expectedNumberOfElements < 0) {
 			throw new IllegalArgumentException("Invalid capacity: " + expectedNumberOfElements);
 		}
-		
+
 		// round up to the next power or two
 		// guard against too small capacity and integer overflows
 		int capacity = Integer.highestOneBit(expectedNumberOfElements) << 1;
@@ -113,17 +113,17 @@ public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> {
 	 * Inserts the given value, mapped under the given key. If the table already contains a value for
 	 * the key, the value is replaced and returned. If no value is contained, yet, the function
 	 * returns null.
-	 * 
+	 *
 	 * @param key The key to insert.
 	 * @param value The value to insert.
 	 * @return The previously mapped value for the key, or null, if no value was mapped for the key.
-	 * 
+	 *
 	 * @throws java.lang.NullPointerException Thrown, if the key is null.
 	 */
 	public final V put(K key, V value) {
 		final int hash = hash(key);
 		final int slot = indexOf (hash);
-		
+
 		// search the chain from the slot
 		for (Entry<K, V> e = table[slot]; e != null; e = e.next) {
 			Object k;
@@ -151,7 +151,7 @@ public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> {
 	 * @param factory The factory that produces the value, if no value is contained, yet, for the key.
 	 * @return The value in the map after this operation (either the previously contained value, or the
 	 *         newly created value).
-	 * 
+	 *
 	 * @throws java.lang.NullPointerException Thrown, if the key is null.
 	 */
 	public final V putIfAbsent(K key, LazyFactory<V> factory) {
@@ -178,14 +178,14 @@ public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> {
 	 * Inserts or aggregates a value into the hash map. If the hash map does not yet contain the key,
 	 * this method inserts the value. If the table already contains the key (and a value) this
 	 * method will use the given ReduceFunction function to combine the existing value and the
-	 * given value to a new value, and store that value for the key. 
-	 * 
+	 * given value to a new value, and store that value for the key.
+	 *
 	 * @param key The key to map the value.
 	 * @param value The new value to insert, or aggregate with the existing value.
 	 * @param aggregator The aggregator to use if a value is already contained.
-	 * 
+	 *
 	 * @return The value in the map after this operation: Either the given value, or the aggregated value.
-	 * 
+	 *
 	 * @throws java.lang.NullPointerException Thrown, if the key is null.
 	 * @throws Exception The method forwards exceptions from the aggregation function.
 	 */
@@ -210,23 +210,23 @@ public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> {
 
 	/**
 	 * Looks up the value mapped under the given key. Returns null if no value is mapped under this key.
-	 * 
+	 *
 	 * @param key The key to look up.
 	 * @return The value associated with the key, or null, if no value is found for the key.
-	 * 
+	 *
 	 * @throws java.lang.NullPointerException Thrown, if the key is null.
 	 */
 	public V get(K key) {
 		final int hash = hash(key);
 		final int slot = indexOf(hash);
-		
+
 		// search the chain from the slot
 		for (Entry<K, V> entry = table[slot]; entry != null; entry = entry.next) {
 			if (entry.hashCode == hash && entry.key.equals(key)) {
 				return entry.value;
 			}
 		}
-		
+
 		// not found
 		return null;
 	}
@@ -241,26 +241,26 @@ public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> {
 			growTable();
 		}
 	}
-	
+
 	private int indexOf(int hashCode) {
 		return (hashCode >> shift) & (table.length - 1);
 	}
 
 	/**
 	 * Creates an iterator over the entries of this map.
-	 * 
+	 *
 	 * @return An iterator over the entries of this map.
 	 */
 	@Override
 	public Iterator<Entry<K, V>> iterator() {
 		return new Iterator<Entry<K, V>>() {
-			
+
 			private final Entry<K, V>[] tab = KeyMap.this.table;
-			
+
 			private Entry<K, V> nextEntry;
-			
+
 			private int nextPos = 0;
-			
+
 			@Override
 			public boolean hasNext() {
 				if (nextEntry != null) {
@@ -296,11 +296,11 @@ public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> {
 			}
 		};
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Properties
 	// ------------------------------------------------------------------------
-	
+
 	/**
 	 * Gets the number of elements currently in the map.
 	 * @return The number of elements currently in the map.
@@ -329,33 +329,33 @@ public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> {
 	/**
 	 * Gets the base-2 logarithm of the hash table capacity, as returned by
 	 * {@link #getCurrentTableCapacity()}.
-	 * 
+	 *
 	 * @return The base-2 logarithm of the hash table capacity.
 	 */
 	public int getLog2TableCapacity() {
 		return log2size;
 	}
-	
+
 	public int getRehashThreshold() {
 		return rehashThreshold;
 	}
-	
+
 	public int getShift() {
 		return shift;
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
-	
+
 	@SuppressWarnings("unchecked")
 	private Entry<K, V>[] allocateTable(int numElements) {
 		return (Entry<K, V>[]) new Entry<?, ?>[numElements];
 	}
-	
+
 	private void growTable() {
 		final int newSize = table.length << 1;
-				
+
 		// only grow if there is still space to grow the table
 		if (newSize > 0) {
 			final Entry<K, V>[] oldTable = this.table;
@@ -363,7 +363,7 @@ public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> {
 
 			final int newShift = shift - 1;
 			final int newMask = newSize - 1;
-			
+
 			// go over all slots from the table. since we hash to adjacent positions in
 			// the new hash table, this is actually cache efficient
 			for (Entry<K, V> entry : oldTable) {
@@ -376,17 +376,17 @@ public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> {
 					entry = nextEntry;
 				}
 			}
-			
+
 			this.table = newTable;
 			this.shift = newShift;
 			this.rehashThreshold = getRehashThreshold(newSize);
 			this.log2size += 1;
 		}
 	}
-	
+
 	private static int hash(Object key) {
 		int code = key.hashCode();
-		
+
 		// we need a strong hash function that generates diverse upper bits
 		// this hash function is more expensive than the "scramble" used by "java.util.HashMap",
 		// but required for this sort of hash table
@@ -397,7 +397,7 @@ public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> {
 		code = (code + 0xfd7046c5) + (code << 3);
 		return (code ^ 0xb55a4f09) ^ (code >>> 16);
 	}
-	
+
 	private static int getRehashThreshold(int capacity) {
 		// divide before multiply, to avoid overflow
 		return capacity / 4 * 3;
@@ -410,26 +410,26 @@ public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> {
 	/**
 	 * For testing only: Actively counts the number of entries, rather than using the
 	 * counter variable. This method has linear complexity, rather than constant.
-	 * 
+	 *
 	 * @return The counted number of entries.
 	 */
 	int traverseAndCountElements() {
 		int num = 0;
-		
+
 		for (Entry<?, ?> entry : table) {
 			while (entry != null) {
 				num++;
 				entry = entry.next;
 			}
 		}
-		
+
 		return num;
 	}
 
 	/**
 	 * For testing only: Gets the length of the longest overflow chain.
 	 * This method has linear complexity.
-	 * 
+	 *
 	 * @return The length of the longest overflow chain.
 	 */
 	int getLongestChainLength() {
@@ -451,15 +451,15 @@ public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> {
 
 	/**
 	 * An entry in the hash table.
-	 * 
+	 *
 	 * @param <K> Type of the key.
 	 * @param <V> Type of the value.
 	 */
 	public static final class Entry<K, V> {
-		
+
 		final K key;
 		final int hashCode;
-		
+
 		V value;
 		Entry<K, V> next;
 		long touchedTag;
@@ -479,20 +479,20 @@ public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> {
 			return value;
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 
 	/**
 	 * Performs a traversal about logical the multi-map that results from the union of the
 	 * given maps. This method does not actually build a union of the map, but traverses the hash maps
 	 * together.
-	 * 
+	 *
 	 * @param maps The array uf maps whose union should be traversed.
 	 * @param visitor The visitor that is called for each key and all values.
 	 * @param touchedTag A tag that is used to mark elements that have been touched in this specific
 	 *                   traversal. Each successive traversal should supply a larger value for this
 	 *                   tag than the previous one.
-	 * 
+	 *
 	 * @param <K> The type of the map's key.
 	 * @param <V> The type of the map's value.
 	 */
@@ -504,21 +504,21 @@ public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> {
 	{
 		// we need to work on the maps in descending size
 		Arrays.sort(maps, CapacityDescendingComparator.INSTANCE);
-		
+
 		final int[] shifts = new int[maps.length];
 		final int[] lowBitsMask = new int[maps.length];
 		final int numSlots = maps[0].table.length;
 		final int numTables = maps.length;
-		
+
 		// figure out how much each hash table collapses the entries
 		for (int i = 0; i < numTables; i++) {
 			shifts[i] = maps[0].log2size - maps[i].log2size;
 			lowBitsMask[i] = (1 << shifts[i]) - 1;
 		}
-		
+
 		// go over all slots (based on the largest hash table)
 		for (int pos = 0; pos < numSlots; pos++) {
-			
+
 			// for each slot, go over all tables, until the table does not have that slot any more
 			// for tables where multiple slots collapse into one, we visit that one when we process the
 			// latest of all slots that collapse to that one
@@ -534,14 +534,14 @@ public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> {
 					// take only entries that have not been collected as part of other tables
 					if (entry.touchedTag < touchedTag) {
 						entry.touchedTag = touchedTag;
-						
+
 						final K key = entry.key;
 						final int hashCode = entry.hashCode;
 						visitor.startNewKey(key);
 						visitor.nextValue(entry.value);
-						
+
 						addEntriesFromChain(entry.next, visitor, key, touchedTag, hashCode);
-						
+
 						// go over the other hash tables and collect their entries for the key
 						for (int followupTable = rootTable + 1; followupTable < numTables; followupTable++) {
 							Entry<K, V> followupEntry = maps[followupTable].table[pos >> shifts[followupTable]];
@@ -552,13 +552,13 @@ public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> {
 
 						visitor.keyDone();
 					}
-					
+
 					entry = entry.next;
 				}
 			}
 		}
 	}
-	
+
 	private static <K, V> void addEntriesFromChain(
 			Entry<K, V> entry,
 			TraversalEvaluator<K, V> visitor,
@@ -576,15 +576,15 @@ public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> {
 	}
 
 	// ------------------------------------------------------------------------
-	
+
 	/**
 	 * Comparator that defines a descending order on maps depending on their table capacity
 	 * and number of elements.
 	 */
 	static final class CapacityDescendingComparator implements Comparator<KeyMap<?, ?>> {
-		
+
 		static final CapacityDescendingComparator INSTANCE = new CapacityDescendingComparator();
-		
+
 		private CapacityDescendingComparator() {}
 
 
@@ -622,8 +622,8 @@ public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> {
 	/**
 	 * A visitor for a traversal over the union of multiple hash maps. The visitor is
 	 * called for each key in the union of the maps and all values associated with that key
-	 * (one per map, but multiple across maps). 
-	 * 
+	 * (one per map, but multiple across maps).
+	 *
 	 * @param <K> The type of the key.
 	 * @param <V> The type of the value.
 	 */
@@ -631,7 +631,7 @@ public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> {
 
 		/**
 		 * Called whenever the traversal starts with a new key.
-		 * 
+		 *
 		 * @param key The key traversed.
 		 * @throws Exception Method forwards all exceptions.
 		 */
@@ -639,7 +639,7 @@ public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> {
 
 		/**
 		 * Called for each value found for the current key.
-		 * 
+		 *
 		 * @param value The next value.
 		 * @throws Exception Method forwards all exceptions.
 		 */
@@ -647,7 +647,7 @@ public class KeyMap<K, V> implements Iterable<KeyMap.Entry<K, V>> {
 
 		/**
 		 * Called when the traversal for the current key is complete.
-		 * 
+		 *
 		 * @throws Exception Method forwards all exceptions.
 		 */
 		void keyDone() throws Exception;

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/package-info.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/package-info.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/package-info.java
index 55749a1..229812c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/package-info.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/package-info.java
@@ -17,6 +17,6 @@
 
 /**
  * This package contains the operators that implement the various window operations
- * on data streams. 
+ * on data streams.
  */
 package org.apache.flink.streaming.runtime.operators.windowing;

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java
index ad313fd..0ae737c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java
@@ -23,7 +23,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 /**
  * Partitioner that forwards elements only to the locally running downstream operation.
- * 
+ *
  * @param <T> Type of the elements in the Stream
  */
 @Internal
@@ -36,11 +36,11 @@ public class ForwardPartitioner<T> extends StreamPartitioner<T> {
 	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record, int numberOfOutputChannels) {
 		return returnArray;
 	}
-	
+
 	public StreamPartitioner<T> copy() {
 		return this;
 	}
-	
+
 	@Override
 	public String toString() {
 		return "FORWARD";

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
index dd2d4ff..a81f973 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
@@ -24,7 +24,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 /**
  * Partitioner that distributes the data equally by cycling through the output
  * channels.
- * 
+ *
  * @param <T> Type of the elements in the Stream being rebalanced
  */
 @Internal
@@ -39,11 +39,11 @@ public class RebalancePartitioner<T> extends StreamPartitioner<T> {
 		this.returnArray[0] = (this.returnArray[0] + 1) % numberOfOutputChannels;
 		return this.returnArray;
 	}
-	
+
 	public StreamPartitioner<T> copy() {
 		return this;
 	}
-	
+
 	@Override
 	public String toString() {
 		return "REBALANCE";

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitioner.java
index a6d9d0d..9061523 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitioner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitioner.java
@@ -55,11 +55,11 @@ public class RescalePartitioner<T> extends StreamPartitioner<T> {
 		this.returnArray[0] = (this.returnArray[0] + 1) % numberOfOutputChannels;
 		return this.returnArray;
 	}
-	
+
 	public StreamPartitioner<T> copy() {
 		return this;
 	}
-	
+
 	@Override
 	public String toString() {
 		return "RESCALE";

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
index 36c123a..87243fc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
@@ -26,7 +26,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 /**
  * Partitioner that distributes the data equally by selecting one output channel
  * randomly.
- * 
+ *
  * @param <T>
  *            Type of the Tuple
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
index 643e240..10e0a93 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
  */
 @Internal
 public abstract class StreamElement {
-	
+
 	/**
 	 * Checks whether this element is a watermark.
 	 * @return True, if this element is a watermark, false otherwise.

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
index f2b8aec..64396da 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
@@ -42,17 +42,17 @@ import static java.util.Objects.requireNonNull;
 public final class StreamElementSerializer<T> extends TypeSerializer<StreamElement> {
 
 	private static final long serialVersionUID = 1L;
-	
+
 	private static final int TAG_REC_WITH_TIMESTAMP = 0;
 	private static final int TAG_REC_WITHOUT_TIMESTAMP = 1;
 	private static final int TAG_WATERMARK = 2;
 	private static final int TAG_LATENCY_MARKER = 3;
 	private static final int TAG_STREAM_STATUS = 4;
-	
-	
+
+
 	private final TypeSerializer<T> typeSerializer;
 
-	
+
 	public StreamElementSerializer(TypeSerializer<T> serializer) {
 		if (serializer instanceof StreamElementSerializer) {
 			throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer);
@@ -67,7 +67,7 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
-	
+
 	@Override
 	public boolean isImmutableType() {
 		return false;
@@ -82,7 +82,7 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
-	
+
 	@Override
 	public StreamRecord<T> createInstance() {
 		return new StreamRecord<T>(typeSerializer.createInstance());
@@ -160,7 +160,7 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
 	public void serialize(StreamElement value, DataOutputView target) throws IOException {
 		if (value.isRecord()) {
 			StreamRecord<T> record = value.asRecord();
-			
+
 			if (record.hasTimestamp()) {
 				target.write(TAG_REC_WITH_TIMESTAMP);
 				target.writeLong(record.getTimestamp());
@@ -187,7 +187,7 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
 			throw new RuntimeException();
 		}
 	}
-	
+
 	@Override
 	public StreamElement deserialize(DataInputView source) throws IOException {
 		int tag = source.readByte();
@@ -242,7 +242,7 @@ public final class StreamElementSerializer<T> extends TypeSerializer<StreamEleme
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
-	
+
 	@Override
 	public boolean equals(Object obj) {
 		if (obj instanceof StreamElementSerializer) {

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
index 0395897..a0610ab 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
@@ -21,21 +21,21 @@ import org.apache.flink.annotation.Internal;
 
 /**
  * One value in a data stream. This stores the value and an optional associated timestamp.
- * 
+ *
  * @param <T> The type encapsulated with the stream record.
  */
 @Internal
 public final class StreamRecord<T> extends StreamElement {
-	
+
 	/** The actual value held by this record. */
 	private T value;
-	
+
 	/** The timestamp of the record. */
 	private long timestamp;
 
 	/** Flag whether the timestamp is actually set. */
 	private boolean hasTimestamp;
-	
+
 	/**
 	 * Creates a new StreamRecord. The record does not have a timestamp.
 	 */
@@ -59,7 +59,7 @@ public final class StreamRecord<T> extends StreamElement {
 	// ------------------------------------------------------------------------
 	//  Accessors
 	// ------------------------------------------------------------------------
-	
+
 	/**
 	 * Returns the value wrapped in this stream value.
 	 */
@@ -82,7 +82,7 @@ public final class StreamRecord<T> extends StreamElement {
 	}
 
 	/** Checks whether this record has a timestamp.
-	 * 
+	 *
  	 * @return True if the record has a timestamp, false if not.
 	 */
 	public boolean hasTimestamp() {
@@ -92,7 +92,7 @@ public final class StreamRecord<T> extends StreamElement {
 	// ------------------------------------------------------------------------
 	//  Updating
 	// ------------------------------------------------------------------------
-	
+
 	/**
 	 * Replace the currently stored value by the given new value. This returns a StreamElement
 	 * with the generic type parameter that matches the new value while keeping the old
@@ -121,10 +121,10 @@ public final class StreamRecord<T> extends StreamElement {
 		this.timestamp = timestamp;
 		this.value = (T) value;
 		this.hasTimestamp = true;
-		
+
 		return (StreamRecord<X>) this;
 	}
-	
+
 	public void setTimestamp(long timestamp) {
 		this.timestamp = timestamp;
 		this.hasTimestamp = true;
@@ -133,7 +133,7 @@ public final class StreamRecord<T> extends StreamElement {
 	public void eraseTimestamp() {
 		this.hasTimestamp = false;
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Copying
 	// ------------------------------------------------------------------------
@@ -162,7 +162,7 @@ public final class StreamRecord<T> extends StreamElement {
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
-	
+
 	@Override
 	public boolean equals(Object o) {
 		if (this == o) {

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java
index 39d3ed2..1af81d0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.Internal;
 
 /**
  * An exception for wrapping exceptions that are thrown by an operator in threads other than the
- * main compute thread of that operator. 
+ * main compute thread of that operator.
  */
 @Internal
 public class AsynchronousException extends Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java
index c36d45d..9d7c3d7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java
@@ -37,12 +37,12 @@ public class ExceptionInChainedOperatorException extends RuntimeException {
 	public ExceptionInChainedOperatorException(String message, Throwable cause) {
 		super(message, requireNonNull(cause));
 	}
-	
+
 	public Throwable getOriginalCause() {
 		Throwable ex = this;
 		do {
 			ex = ex.getCause();
 		} while (ex instanceof ExceptionInChainedOperatorException);
-		return ex; 
+		return ex;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 0343b54..f461e31 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -32,7 +32,7 @@ import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
 public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamOperator<IN, OUT>> {
 
 	private StreamInputProcessor<IN> inputProcessor;
-	
+
 	private volatile boolean running = true;
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index aced2f4..93cf646 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -61,19 +61,19 @@ import java.util.Random;
 /**
  * The {@code OperatorChain} contains all operators that are executed as one chain within a single
  * {@link StreamTask}.
- * 
+ *
  * @param <OUT> The type of elements accepted by the chain, i.e., the input type of the chain's
  *              head operator.
  */
 @Internal
 public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusMaintainer {
-	
+
 	private static final Logger LOG = LoggerFactory.getLogger(OperatorChain.class);
-	
+
 	private final StreamOperator<?>[] allOperators;
 
 	private final RecordWriterOutput<?>[] streamOutputs;
-	
+
 	private final Output<StreamRecord<OUT>> chainEntryPoint;
 
 	private final OP headOperator;
@@ -87,7 +87,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 	private StreamStatus streamStatus = StreamStatus.ACTIVE;
 
 	public OperatorChain(StreamTask<OUT, OP> containingTask) {
-		
+
 		final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
 		final StreamConfig configuration = containingTask.getConfiguration();
 
@@ -102,7 +102,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 		List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassloader);
 		Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap = new HashMap<>(outEdgesInOrder.size());
 		this.streamOutputs = new RecordWriterOutput<?>[outEdgesInOrder.size()];
-		
+
 		// from here on, we need to make sure that the output writers are shut down again on failure
 		boolean success = false;
 		try {
@@ -112,11 +112,11 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 				RecordWriterOutput<?> streamOutput = createStreamOutput(
 						outEdge, chainedConfigs.get(outEdge.getSourceId()), i,
 						containingTask.getEnvironment(), containingTask.getName());
-	
+
 				this.streamOutputs[i] = streamOutput;
 				streamOutputMap.put(outEdge, streamOutput);
 			}
-	
+
 			// we create the chain of operators and grab the collector that leads into the chain
 			List<StreamOperator<?>> allOps = new ArrayList<>(chainedConfigs.size());
 			this.chainEntryPoint = createOutputCollector(containingTask, configuration,
@@ -131,7 +131,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 			allOps.add(headOperator);
 
 			this.allOperators = allOps.toArray(new StreamOperator<?>[allOps.size()]);
-			
+
 			success = true;
 		}
 		finally {
@@ -146,7 +146,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 				}
 			}
 		}
-		
+
 	}
 
 	@Override
@@ -193,7 +193,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 	public RecordWriterOutput<?>[] getStreamOutputs() {
 		return streamOutputs;
 	}
-	
+
 	public StreamOperator<?>[] getAllOperators() {
 		return allOperators;
 	}
@@ -247,7 +247,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 	// ------------------------------------------------------------------------
 	//  initialization utilities
 	// ------------------------------------------------------------------------
-	
+
 	private <T> Output<StreamRecord<T>> createOutputCollector(
 			StreamTask<?, ?> containingTask,
 			StreamConfig operatorConfig,
@@ -257,7 +257,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 			List<StreamOperator<?>> allOperators)
 	{
 		List<Tuple2<Output<StreamRecord<T>>, StreamEdge>> allOutputs = new ArrayList<>(4);
-		
+
 		// create collectors for the network outputs
 		for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
 			@SuppressWarnings("unchecked")
@@ -275,12 +275,12 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 					containingTask, chainedOpConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators, outputEdge.getOutputTag());
 			allOutputs.add(new Tuple2<>(output, outputEdge));
 		}
-		
+
 		// if there are multiple outputs, or the outputs are directed, we need to
 		// wrap them as one output
-		
+
 		List<OutputSelector<T>> selectors = operatorConfig.getOutputSelectors(userCodeClassloader);
-		
+
 		if (selectors == null || selectors.isEmpty()) {
 			// simple path, no selector necessary
 			if (allOutputs.size() == 1) {
@@ -316,10 +316,10 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 			} else {
 				return new DirectedOutput<>(selectors, allOutputs);
 			}
-			
+
 		}
 	}
-	
+
 	private <IN, OUT> Output<StreamRecord<IN>> createChainedOperator(
 			StreamTask<?, ?> containingTask,
 			StreamConfig operatorConfig,
@@ -348,7 +348,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 			return new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this);
 		}
 	}
-	
+
 	private <T> RecordWriterOutput<T> createStreamOutput(
 			StreamEdge edge, StreamConfig upStreamConfig, int outputIndex,
 			Environment taskEnvironment,
@@ -370,7 +370,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 		StreamPartitioner<T> outputPartitioner = (StreamPartitioner<T>) edge.getPartitioner();
 
 		LOG.debug("Using partitioner {} for output {} of task ", outputPartitioner, outputIndex, taskName);
-		
+
 		ResultPartitionWriter bufferWriter = taskEnvironment.getWriter(outputIndex);
 
 		// we initialize the partitioner here with the number of key groups (aka max. parallelism)
@@ -384,16 +384,16 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 		StreamRecordWriter<SerializationDelegate<StreamRecord<T>>> output =
 				new StreamRecordWriter<>(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout());
 		output.setMetricGroup(taskEnvironment.getMetricGroup().getIOMetricGroup());
-		
+
 		return new RecordWriterOutput<>(output, outSerializer, sideOutputTag, this);
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Collectors for output chaining
-	// ------------------------------------------------------------------------ 
+	// ------------------------------------------------------------------------
 
 	private static class ChainingOutput<T> implements Output<StreamRecord<T>> {
-		
+
 		protected final OneInputStreamOperator<T, ?> operator;
 		protected final Counter numRecordsIn;
 
@@ -482,7 +482,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 	}
 
 	private static final class CopyingChainingOutput<T> extends ChainingOutput<T> {
-		
+
 		private final TypeSerializer<T> serializer;
 
 		public CopyingChainingOutput(
@@ -533,9 +533,9 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 
 		}
 	}
-	
+
 	private static class BroadcastingOutputCollector<T> implements Output<StreamRecord<T>> {
-		
+
 		protected final Output<StreamRecord<T>>[] outputs;
 
 		private final Random RNG = new XORShiftRandom();

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
index 1164ea0..9bccae0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
@@ -33,7 +33,7 @@ public interface ProcessingTimeCallback {
 	 * <p>If the triggering is delayed for whatever reason (trigger timer was blocked, JVM stalled
 	 * due to a garbage collection), the timestamp supplied to this function will still be the
 	 * original timestamp for which the trigger was scheduled.
-	 * 
+	 *
 	 * @param timestamp The timestamp for which the trigger event was scheduled.
 	 */
 	void onProcessingTime(long timestamp) throws Exception;

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
index 240aba8..11074a2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
@@ -22,10 +22,10 @@ import java.util.concurrent.ScheduledFuture;
 /**
  * Defines the current processing time and handles all related actions,
  * such as register timers for tasks to be executed in the future.
- * 
+ *
  * <p>The access to the time via {@link #getCurrentProcessingTime()} is always available, regardless of
  * whether the timer service has been shut down.
- * 
+ *
  * <p>The registration of timers follows a life cycle of three phases:
  * <ol>
  *     <li>In the initial state, it accepts timer registrations and triggers when the time is reached.</li>
@@ -46,10 +46,10 @@ public abstract class ProcessingTimeService {
 
 	/**
 	 * Registers a task to be executed when (processing) time is {@code timestamp}.
-	 * 
+	 *
 	 * @param timestamp   Time when the task is to be executed (in processing time)
 	 * @param target      The task to be executed
-	 * 
+	 *
 	 * @return The future that represents the scheduled task. This always returns some future,
 	 *         even if the timer was shut down
 	 */
@@ -75,7 +75,7 @@ public abstract class ProcessingTimeService {
 	 * returns for each call to {@link #registerTimer(long, ProcessingTimeCallback)} only a "mock" future.
 	 * Furthermore, the method clears all not yet started timers, and awaits the completion
 	 * of currently executing timers.
-	 * 
+	 *
 	 * <p>This method can be used to cleanly shut down the timer service. The using components
 	 * will not notice that the service is shut down (as for example via exceptions when registering
 	 * a new timer), but the service will simply not fire any timer any more.

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index a3913ce..66e92df 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -49,13 +49,13 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
 	protected void cleanup() {
 		// does not hold any resources, so no cleanup needed
 	}
-	
+
 
 	@Override
 	protected void run() throws Exception {
 		headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
 	}
-	
+
 	@Override
 	protected void cancelTask() throws Exception {
 		if (headOperator != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
index d05ec37..0d2fe1f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
@@ -42,18 +42,18 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
 	private volatile boolean running = true;
 
 	// ------------------------------------------------------------------------
-	
+
 	@Override
 	protected void run() throws Exception {
-		
+
 		final String iterationId = getConfiguration().getIterationId();
 		if (iterationId == null || iterationId.length() == 0) {
 			throw new Exception("Missing iteration ID in the task configuration");
 		}
-		
+
 		final String brokerID = createBrokerIdString(getEnvironment().getJobID(), iterationId ,
 				getEnvironment().getTaskInfo().getIndexOfThisSubtask());
-		
+
 		final long iterationWaitTime = getConfiguration().getIterationWaitTime();
 		final boolean shouldWait = iterationWaitTime > 0;
 
@@ -63,7 +63,7 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
 		BlockingQueueBroker.INSTANCE.handIn(brokerID, dataChannel);
 		LOG.info("Iteration head {} added feedback queue under {}", getName(), brokerID);
 
-		// do the work 
+		// do the work
 		try {
 			@SuppressWarnings("unchecked")
 			RecordWriterOutput<OUT>[] outputs = (RecordWriterOutput<OUT>[]) getStreamOutputs();
@@ -114,7 +114,7 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
 	protected void cleanup() throws Exception {
 		// does not hold any resources, no cleanup necessary
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
@@ -123,7 +123,7 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
 	 * Creates the identification string with which head and tail task find the shared blocking
 	 * queue for the back channel. The identification string is unique per parallel head/tail pair
 	 * per iteration per job.
-	 * 
+	 *
 	 * @param jid The job ID.
 	 * @param iterationID The id of the iteration in the job.
 	 * @param subtaskIndex The parallel subtask number

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
index 7326a80..df5edb1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
@@ -70,7 +70,7 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
 	}
 
 	private static class RecordPusher<IN> extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
-		
+
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -93,9 +93,9 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
 
 		@SuppressWarnings("NonSerializableFieldInSerializableClass")
 		private final BlockingQueue<StreamRecord<IN>> dataChannel;
-		
+
 		private final long iterationWaitTime;
-		
+
 		private final boolean shouldWait;
 
 		IterationTailOutput(BlockingQueue<StreamRecord<IN>> dataChannel, long iterationWaitTime) {

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index e73c825..da4f9b6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -543,7 +543,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
 		}
 		catch (CancelTaskException e) {
-			LOG.info("Operator {} was cancelled while performing checkpoint {}.", 
+			LOG.info("Operator {} was cancelled while performing checkpoint {}.",
 					getName(), checkpointMetaData.getCheckpointId());
 			throw e;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskException.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskException.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskException.java
index 94e2213..8bfb9a0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskException.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskException.java
@@ -39,7 +39,7 @@ public class StreamTaskException extends RuntimeException {
 
 	/**
 	 * Creates a compiler exception with the given message and no cause.
-	 * 
+	 *
 	 * @param message
 	 *            The message for the exception.
 	 */
@@ -49,7 +49,7 @@ public class StreamTaskException extends RuntimeException {
 
 	/**
 	 * Creates a compiler exception with the given cause and no message.
-	 * 
+	 *
 	 * @param cause
 	 *            The <tt>Throwable</tt> that caused this exception.
 	 */
@@ -59,7 +59,7 @@ public class StreamTaskException extends RuntimeException {
 
 	/**
 	 * Creates a compiler exception with the given message and cause.
-	 * 
+	 *
 	 * @param message
 	 *            The message for the exception.
 	 * @param cause

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
index 4c2d731..34202fb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@@ -169,7 +169,7 @@ public class SystemProcessingTimeService extends ProcessingTimeService {
 
 	@Override
 	public void shutdownService() {
-		if (status.compareAndSet(STATUS_ALIVE, STATUS_SHUTDOWN) || 
+		if (status.compareAndSet(STATUS_ALIVE, STATUS_SHUTDOWN) ||
 				status.compareAndSet(STATUS_QUIESCED, STATUS_SHUTDOWN))
 		{
 			timerService.shutdownNow();

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 50aefc8..7214201 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -35,24 +35,24 @@ import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
 public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputStreamOperator<IN1, IN2, OUT>> {
 
 	private StreamTwoInputProcessor<IN1, IN2> inputProcessor;
-	
+
 	private volatile boolean running = true;
 
 	@Override
 	public void init() throws Exception {
 		StreamConfig configuration = getConfiguration();
 		ClassLoader userClassLoader = getUserCodeClassLoader();
-		
+
 		TypeSerializer<IN1> inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader);
 		TypeSerializer<IN2> inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader);
-	
+
 		int numberOfInputs = configuration.getNumberOfInputs();
-	
+
 		ArrayList<InputGate> inputList1 = new ArrayList<InputGate>();
 		ArrayList<InputGate> inputList2 = new ArrayList<InputGate>();
-	
+
 		List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader);
-	
+
 		for (int i = 0; i < numberOfInputs; i++) {
 			int inputType = inEdges.get(i).getTypeNumber();
 			InputGate reader = getEnvironment().getInputGate(i);

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
index b377d70..4344046 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/keys/KeySelectorUtil.java
@@ -50,7 +50,7 @@ public final class KeySelectorUtil {
 		}
 
 		CompositeType<X> compositeType = (CompositeType<X>) typeInfo;
-		
+
 		int[] logicalKeyPositions = keys.computeLogicalKeyPositions();
 		int numKeyFields = logicalKeyPositions.length;
 
@@ -69,9 +69,9 @@ public final class KeySelectorUtil {
 		if (positions == null || positions.length == 0 || positions.length > Tuple.MAX_ARITY) {
 			throw new IllegalArgumentException("Array keys must have between 1 and " + Tuple.MAX_ARITY + " fields.");
 		}
-		
+
 		TypeInformation<?> componentType;
-		
+
 		if (typeInfo instanceof BasicArrayTypeInfo) {
 			BasicArrayTypeInfo<X, ?>  arrayInfo = (BasicArrayTypeInfo<X, ?>) typeInfo;
 			componentType = arrayInfo.getComponentInfo();
@@ -83,14 +83,14 @@ public final class KeySelectorUtil {
 		else {
 			throw new IllegalArgumentException("This method only supports arrays of primitives and boxed primitives.");
 		}
-		
+
 		TypeInformation<?>[] primitiveInfos = new TypeInformation<?>[positions.length];
 		Arrays.fill(primitiveInfos, componentType);
 
 		return new ArrayKeySelector<>(positions, new TupleTypeInfo<>(primitiveInfos));
 	}
 
-	
+
 	public static <X, K> KeySelector<X, K> getSelectorForOneKey(
 			Keys<X> keys, Partitioner<K> partitioner, TypeInformation<X> typeInfo, ExecutionConfig executionConfig)
 	{
@@ -107,7 +107,7 @@ public final class KeySelectorUtil {
 		if (logicalKeyPositions.length != 1) {
 			throw new IllegalArgumentException("There must be exactly 1 key specified");
 		}
-		
+
 		TypeComparator<X> comparator = compositeType.createComparator(
 				logicalKeyPositions, new boolean[] { true }, 0, executionConfig);
 		return new OneKeySelector<>(comparator);
@@ -125,8 +125,8 @@ public final class KeySelectorUtil {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Key extractor that extracts a single field via a generic comparator. 
-	 * 
+	 * Key extractor that extracts a single field via a generic comparator.
+	 *
 	 * @param <IN> The type of the elements where the key is extracted from.
 	 * @param <K> The type of the key.
 	 */
@@ -140,7 +140,7 @@ public final class KeySelectorUtil {
 		 * are null), it does not have any serialization problems */
 		@SuppressWarnings("NonSerializableFieldInSerializableClass")
 		private final Object[] keyArray;
-		
+
 		OneKeySelector(TypeComparator<IN> comparator) {
 			this.comparator = comparator;
 			this.keyArray = new Object[1];
@@ -155,7 +155,7 @@ public final class KeySelectorUtil {
 	}
 
 	// ------------------------------------------------------------------------
-	
+
 	/**
 	 * A key selector for selecting key fields via a TypeComparator.
 	 *
@@ -201,16 +201,16 @@ public final class KeySelectorUtil {
 	}
 
 	// ------------------------------------------------------------------------
-	
+
 	/**
 	 * A key selector for selecting individual array fields as keys and returns them as a Tuple.
-	 * 
+	 *
 	 * @param <IN> The type from which the key is extracted, i.e., the array type.
 	 */
 	public static final class ArrayKeySelector<IN> implements KeySelector<IN, Tuple>, ResultTypeQueryable<Tuple> {
 
 		private static final long serialVersionUID = 1L;
-		
+
 		private final int[] fields;
 		private final Class<? extends Tuple> tupleClass;
 		private transient TupleTypeInfo<Tuple> returnType;

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/AbstractDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/AbstractDeserializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/AbstractDeserializationSchema.java
index 77e76e5..02ea004 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/AbstractDeserializationSchema.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/AbstractDeserializationSchema.java
@@ -27,10 +27,10 @@ import java.io.IOException;
  * The deserialization schema describes how to turn the byte messages delivered by certain
  * data sources (for example Apache Kafka) into data types (Java/Scala objects) that are
  * processed by Flink.
- * 
+ *
  * <p>This base variant of the deserialization schema produces the type information
  * automatically by extracting it from the generic class arguments.
- * 
+ *
  * @param <T> The type created by the deserialization schema.
  */
 public abstract class AbstractDeserializationSchema<T> implements DeserializationSchema<T> {
@@ -49,7 +49,7 @@ public abstract class AbstractDeserializationSchema<T> implements Deserializatio
 	/**
 	 * Method to decide whether the element signals the end of the stream. If
 	 * true is returned the element won't be emitted.
-	 * 
+	 *
 	 * <p>This default implementation returns always false, meaning the stream is interpreted
 	 * to be unbounded.
 	 *
@@ -60,7 +60,7 @@ public abstract class AbstractDeserializationSchema<T> implements Deserializatio
 	public boolean isEndOfStream(T nextElement) {
 		return false;
 	}
-	
+
 	@Override
 	public TypeInformation<T> getProducedType() {
 		return TypeExtractor.createTypeInfo(AbstractDeserializationSchema.class, getClass(), 0, null, null);

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
index 03cab20..57cb27e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
@@ -27,10 +27,10 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
  * The deserialization schema describes how to turn the byte messages delivered by certain
  * data sources (for example Apache Kafka) into data types (Java/Scala objects) that are
  * processed by Flink.
- * 
+ *
  * <p>Note: In most cases, one should start from {@link AbstractDeserializationSchema}, which
  * takes care of producing the return type information automatically.
- * 
+ *
  * @param <T> The type created by the deserialization schema.
  */
 @Public
@@ -38,7 +38,7 @@ public interface DeserializationSchema<T> extends Serializable, ResultTypeQuerya
 
 	/**
 	 * Deserializes the byte message.
-	 * 
+	 *
 	 * @param message The message, as a byte array.
 	 *
 	 * @return The deserialized message as an object (null if the message cannot be deserialized).
@@ -48,7 +48,7 @@ public interface DeserializationSchema<T> extends Serializable, ResultTypeQuerya
 	/**
 	 * Method to decide whether the element signals the end of the stream. If
 	 * true is returned the element won't be emitted.
-	 * 
+	 *
 	 * @param nextElement The element to test for the end-of-stream signal.
 	 * @return True, if the element signals end of stream, false otherwise.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
index 27c85e3..986cfb3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
@@ -25,7 +25,7 @@ import java.io.Serializable;
  * The serialization schema describes how to turn a data object into a different serialized
  * representation. Most data sinks (for example Apache Kafka) require the data to be handed
  * to them in a specific format (for example as byte strings).
- * 
+ *
  * @param <T> The type to be serialized.
  */
 @Public
@@ -33,7 +33,7 @@ public interface SerializationSchema<T> extends Serializable {
 
 	/**
 	 * Serializes the incoming element to a specified type.
-	 * 
+	 *
 	 * @param element
 	 *            The incoming element to be serialized
 	 * @return The serialized element.

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
index ddc55a2..27ba9e9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
@@ -30,7 +30,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Very simple serialization schema for strings.
- * 
+ *
  * <p>By default, the serializer uses "UTF-8" for string/byte conversion.
  */
 @PublicEvolving
@@ -51,7 +51,7 @@ public class SimpleStringSchema implements DeserializationSchema<String>, Serial
 
 	/**
 	 * Creates a new SimpleStringSchema that uses the given charset to convert between strings and bytes.
-	 * 
+	 *
 	 * @param charset The charset to use to convert between strings and bytes.
 	 */
 	public SimpleStringSchema(Charset charset) {

http://git-wip-us.apache.org/repos/asf/flink/blob/5aa93a27/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
index e81a1c7..b187b04 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
@@ -30,20 +30,20 @@ import java.io.IOException;
 /**
  * A serialization and deserialization schema that uses Flink's serialization stack to
  * transform typed from and to byte arrays.
- * 
+ *
  * @param <T> The type to be serialized.
  */
 @Public
 public class TypeInformationSerializationSchema<T> implements DeserializationSchema<T>, SerializationSchema<T> {
-	
+
 	private static final long serialVersionUID = -5359448468131559102L;
-	
+
 	/** The serializer for the actual de-/serialization. */
 	private final TypeSerializer<T> serializer;
 
 	/** The reusable output serialization buffer. */
 	private transient DataOutputSerializer dos;
-	
+
 	/** The reusable input deserialization buffer. */
 	private transient DataInputDeserializer dis;
 
@@ -58,7 +58,7 @@ public class TypeInformationSerializationSchema<T> implements DeserializationSch
 
 	/**
 	 * Creates a new de-/serialization schema for the given type.
-	 * 
+	 *
 	 * @param typeInfo The type information for the type de-/serialized by this schema.
 	 * @param ec The execution config, which is used to parametrize the type serializers.
 	 */
@@ -68,7 +68,7 @@ public class TypeInformationSerializationSchema<T> implements DeserializationSch
 	}
 
 	// ------------------------------------------------------------------------
-	
+
 	@Override
 	public T deserialize(byte[] message) {
 		if (dis != null) {
@@ -76,7 +76,7 @@ public class TypeInformationSerializationSchema<T> implements DeserializationSch
 		} else {
 			dis = new DataInputDeserializer(message, 0, message.length);
 		}
-		
+
 		try {
 			return serializer.deserialize(dis);
 		}
@@ -100,14 +100,14 @@ public class TypeInformationSerializationSchema<T> implements DeserializationSch
 		if (dos == null) {
 			dos = new DataOutputSerializer(16);
 		}
-		
+
 		try {
 			serializer.serialize(element, dos);
 		}
 		catch (IOException e) {
 			throw new RuntimeException("Unable to serialize record", e);
 		}
-		
+
 		byte[] ret = dos.getByteArray();
 		if (ret.length != dos.length()) {
 			byte[] n = new byte[dos.length()];


Mime
View raw message