flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [20/22] flink git commit: [hotfix] [runtime] Fix checkstyle for 'runtime/io/network/api'.
Date Thu, 01 Feb 2018 15:46:52 GMT
[hotfix] [runtime] Fix checkstyle for 'runtime/io/network/api'.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b4675f2a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b4675f2a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b4675f2a

Branch: refs/heads/master
Commit: b4675f2a3aa1b5a1635923b8af7dbea88902154a
Parents: 4e481a7
Author: Stephan Ewen <sewen@apache.org>
Authored: Fri Jan 26 10:42:34 2018 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Feb 1 13:54:56 2018 +0100

----------------------------------------------------------------------
 .../io/network/api/CancelCheckpointMarker.java  |   8 +-
 .../io/network/api/CheckpointBarrier.java       |  18 +--
 .../io/network/api/EndOfPartitionEvent.java     |  10 +-
 .../io/network/api/EndOfSuperstepEvent.java     |  10 +-
 .../io/network/api/TaskEventHandler.java        |   2 +-
 .../io/network/api/reader/AbstractReader.java   |   7 +-
 .../api/reader/AbstractRecordReader.java        |   4 +-
 .../io/network/api/reader/MutableReader.java    |   4 +-
 .../network/api/reader/MutableRecordReader.java |   7 +-
 .../runtime/io/network/api/reader/Reader.java   |   4 +-
 .../io/network/api/reader/ReaderBase.java       |   4 +-
 .../io/network/api/reader/RecordReader.java     |   6 +-
 .../AdaptiveSpanningRecordDeserializer.java     |  31 ++--
 .../api/serialization/EventSerializer.java      |   2 +-
 .../api/serialization/RecordDeserializer.java   |  14 +-
 .../api/serialization/RecordSerializer.java     |  14 +-
 .../serialization/SpanningRecordSerializer.java |  17 +--
 ...llingAdaptiveSpanningRecordDeserializer.java | 153 +++++++++----------
 .../io/network/api/writer/RecordWriter.java     |  13 +-
 .../api/writer/RoundRobinChannelSelector.java   |   2 -
 .../io/network/api/CheckpointBarrierTest.java   |   5 +-
 .../network/api/reader/AbstractReaderTest.java  |   1 +
 .../api/serialization/EventSerializerTest.java  |  29 ++--
 .../api/serialization/PagedViewsTest.java       |  63 ++++----
 .../SpanningRecordSerializationTest.java        |  76 +++++----
 .../SpanningRecordSerializerTest.java           |  54 ++++---
 .../io/network/api/writer/RecordWriterTest.java |  40 +----
 tools/maven/suppressions-runtime.xml            |   4 +-
 28 files changed, 289 insertions(+), 313 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java
index 52a2517..207df52 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java
@@ -31,7 +31,7 @@ import java.io.IOException;
  */
 public class CancelCheckpointMarker extends RuntimeEvent {
 
-	/** The id of the checkpoint to be canceled */
+	/** The id of the checkpoint to be canceled. */
 	private final long checkpointId;
 
 	public CancelCheckpointMarker(long checkpointId) {
@@ -44,7 +44,7 @@ public class CancelCheckpointMarker extends RuntimeEvent {
 
 	// ------------------------------------------------------------------------
 	// These known and common event go through special code paths, rather than
-	// through generic serialization 
+	// through generic serialization.
 
 	@Override
 	public void write(DataOutputView out) throws IOException {
@@ -55,7 +55,7 @@ public class CancelCheckpointMarker extends RuntimeEvent {
 	public void read(DataInputView in) throws IOException {
 		throw new UnsupportedOperationException("this method should never be called");
 	}
-	
+
 	// ------------------------------------------------------------------------
 
 	@Override
@@ -65,7 +65,7 @@ public class CancelCheckpointMarker extends RuntimeEvent {
 
 	@Override
 	public boolean equals(Object other) {
-		return other != null && 
+		return other != null &&
 				other.getClass() == CancelCheckpointMarker.class &&
 				this.checkpointId == ((CancelCheckpointMarker) other).checkpointId;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
index 97ad90f..78d6707 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
@@ -18,28 +18,28 @@
 
 package org.apache.flink.runtime.io.network.api;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-import java.io.IOException;
-
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.event.RuntimeEvent;
 
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Checkpoint barriers are used to align checkpoints throughout the streaming topology. The
  * barriers are emitted by the sources when instructed to do so by the JobManager. When
- * operators receive a CheckpointBarrier on one of its inputs, it knows that this is the point 
+ * operators receive a CheckpointBarrier on one of its inputs, it knows that this is the point
  * between the pre-checkpoint and post-checkpoint data.
- * 
+ *
  * <p>Once an operator has received a checkpoint barrier from all its input channels, it
  * knows that a certain checkpoint is complete. It can trigger the operator specific checkpoint
  * behavior and broadcast the barrier to downstream operators.
- * 
+ *
  * <p>Depending on the semantic guarantees, may hold off post-checkpoint data until the checkpoint
  * is complete (exactly once).
- * 
+ *
  * <p>The checkpoint barrier IDs are strictly monotonous increasing.
  */
 public class CheckpointBarrier extends RuntimeEvent {
@@ -75,7 +75,7 @@ public class CheckpointBarrier extends RuntimeEvent {
 	//  but would require the CheckpointBarrier to be mutable. Since all serialization
 	//  for events goes through the EventSerializer class, which has special serialization
 	//  for the CheckpointBarrier, we don't need these methods
-	// 
+	//
 
 	@Override
 	public void write(DataOutputView out) throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
index 293287b..3422341 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
@@ -27,16 +27,16 @@ import org.apache.flink.runtime.event.RuntimeEvent;
  */
 public class EndOfPartitionEvent extends RuntimeEvent {
 
-	/** The singleton instance of this event */
+	/** The singleton instance of this event. */
 	public static final EndOfPartitionEvent INSTANCE = new EndOfPartitionEvent();
-	
+
 	// ------------------------------------------------------------------------
 
 	// not instantiable
 	private EndOfPartitionEvent() {}
-	
+
 	// ------------------------------------------------------------------------
-	
+
 	@Override
 	public void read(DataInputView in) {
 		// Nothing to do here
@@ -48,7 +48,7 @@ public class EndOfPartitionEvent extends RuntimeEvent {
 	}
 
 	// ------------------------------------------------------------------------
-	
+
 	@Override
 	public int hashCode() {
 		return 1965146673;

http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java
index 7f51187..0603e32 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java
@@ -27,16 +27,16 @@ import org.apache.flink.runtime.event.RuntimeEvent;
  */
 public class EndOfSuperstepEvent extends RuntimeEvent {
 
-	/** The singleton instance of this event */
+	/** The singleton instance of this event. */
 	public static final EndOfSuperstepEvent INSTANCE = new EndOfSuperstepEvent();
 
 	// ------------------------------------------------------------------------
-	
-	// not instantiable
+
+	/** This class is not meant to be instantiated. */
 	private EndOfSuperstepEvent() {}
-	
+
 	// ------------------------------------------------------------------------
-	
+
 	@Override
 	public void write(DataOutputView out) {}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/TaskEventHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/TaskEventHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/TaskEventHandler.java
index 4121587..16ca5a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/TaskEventHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/TaskEventHandler.java
@@ -30,7 +30,7 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Multimap;
  */
 public class TaskEventHandler {
 
-	/** Listeners for each event type */
+	/** Listeners for each event type. */
 	private final Multimap<Class<? extends TaskEvent>, EventListener<TaskEvent>> listeners = HashMultimap.create();
 
 	public void subscribe(EventListener<TaskEvent> listener, Class<? extends TaskEvent> eventType) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
index 3a343bf..aaa21e7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java
@@ -110,7 +110,7 @@ public abstract class AbstractReader implements ReaderBase {
 			throw new IOException("Error while handling event of type " + eventType + ": " + t.getMessage(), t);
 		}
 	}
-	
+
 	public void publish(TaskEvent event){
 		taskEventHandler.publish(event);
 	}
@@ -134,11 +134,8 @@ public abstract class AbstractReader implements ReaderBase {
 
 	@Override
 	public boolean hasReachedEndOfSuperstep() {
-		if (isIterative) {
-			return currentNumberOfEndOfSuperstepEvents == inputGate.getNumberOfInputChannels();
-		}
+		return isIterative && currentNumberOfEndOfSuperstepEvents == inputGate.getNumberOfInputChannels();
 
-		return false;
 	}
 
 	private boolean incrementEndOfSuperstepEventAndCheck() {

http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
index 1ac5f75..e3c8484 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
@@ -30,8 +30,8 @@ import java.io.IOException;
 
 /**
  * A record-oriented reader.
- * <p>
- * This abstract base class is used by both the mutable and immutable record readers.
+ *
+ * <p>This abstract base class is used by both the mutable and immutable record readers.
  *
  * @param <T> The type of the record that can be read with this record reader.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableReader.java
index e47982e..a03fc4c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableReader.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.runtime.io.network.api.reader;
 
-import java.io.IOException;
-
 import org.apache.flink.core.io.IOReadableWritable;
 
+import java.io.IOException;
+
 /**
  * A record-oriented reader for mutable record types.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java
index 9836ba4..e5242e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java
@@ -23,12 +23,17 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 
 import java.io.IOException;
 
+/**
+ * Implementation of the record-oriented reader for mutable record types.
+ *
+ * @param <T> The type of the record that is read.
+ */
 public class MutableRecordReader<T extends IOReadableWritable> extends AbstractRecordReader<T> implements MutableReader<T> {
 
 	/**
 	 * Creates a new MutableRecordReader that de-serializes records from the given input gate and
 	 * can spill partial records to disk, if they grow large.
-	 * 
+	 *
 	 * @param inputGate The input gate to read from.
 	 * @param tmpDirectories The temp directories. USed for spilling if the reader concurrently
 	 *                       reconstructs multiple large records.

http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/Reader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/Reader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/Reader.java
index 7255390..8295980 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/Reader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/Reader.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.runtime.io.network.api.reader;
 
-import java.io.IOException;
-
 import org.apache.flink.core.io.IOReadableWritable;
 
+import java.io.IOException;
+
 /**
  * A record-oriented reader for immutable record types.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
index 0cc77f0..6671012 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.runtime.io.network.api.reader;
 
-import java.io.IOException;
-
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.util.event.EventListener;
 
+import java.io.IOException;
+
 /**
  * The basic API for every reader.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java
index 9eed374..4bcf8be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java
@@ -23,6 +23,11 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 
 import java.io.IOException;
 
+/**
+ * Record oriented reader for immutable types.
+ *
+ * @param <T> Thy type of the records that is read.
+ */
 public class RecordReader<T extends IOReadableWritable> extends AbstractRecordReader<T> implements Reader<T> {
 
 	private final Class<T> recordType;
@@ -85,5 +90,4 @@ public class RecordReader<T extends IOReadableWritable> extends AbstractRecordRe
 			throw new RuntimeException("Cannot instantiate class " + recordType.getName(), e);
 		}
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
index 04a7a21..598216a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
@@ -19,11 +19,11 @@
 package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.core.memory.DataInputDeserializer;
-import org.apache.flink.core.memory.DataOutputSerializer;
 
 import java.io.EOFException;
 import java.io.IOException;
@@ -34,9 +34,6 @@ import java.nio.ByteOrder;
 /**
  * @param <T> The type of the record to be deserialized.
  */
-/**
- * @param <T> The type of the record to be deserialized.
- */
 public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> implements RecordDeserializer<T> {
 
 	private final NonSpanningWrapper nonSpanningWrapper;
@@ -264,7 +261,7 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 					}
 				}
 			}
-			catch (EOFException eofex) {}
+			catch (EOFException ignored) {}
 
 			if (bld.length() == 0) {
 				return null;
@@ -300,7 +297,7 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 
 			int c, char2, char3;
 			int count = 0;
-			int chararr_count = 0;
+			int chararrCount = 0;
 
 			readFully(bytearr, 0, utflen);
 
@@ -310,7 +307,7 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 					break;
 				}
 				count++;
-				chararr[chararr_count++] = (char) c;
+				chararr[chararrCount++] = (char) c;
 			}
 
 			while (count < utflen) {
@@ -325,7 +322,7 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 					case 6:
 					case 7:
 						count++;
-						chararr[chararr_count++] = (char) c;
+						chararr[chararrCount++] = (char) c;
 						break;
 					case 12:
 					case 13:
@@ -337,7 +334,7 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 						if ((char2 & 0xC0) != 0x80) {
 							throw new UTFDataFormatException("malformed input around byte " + count);
 						}
-						chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
+						chararr[chararrCount++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
 						break;
 					case 14:
 						count += 3;
@@ -349,14 +346,14 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 						if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
 							throw new UTFDataFormatException("malformed input around byte " + (count - 1));
 						}
-						chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
+						chararr[chararrCount++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F)));
 						break;
 					default:
 						throw new UTFDataFormatException("malformed input around byte " + count);
 				}
 			}
 			// The number of chars produced may be less than utflen
-			return new String(chararr, 0, chararr_count);
+			return new String(chararr, 0, chararrCount);
 		}
 
 		@Override
@@ -374,27 +371,27 @@ public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> im
 		public void skipBytesToRead(int numBytes) throws IOException {
 			int skippedBytes = skipBytes(numBytes);
 
-			if(skippedBytes < numBytes){
+			if (skippedBytes < numBytes){
 				throw new EOFException("Could not skip " + numBytes + " bytes.");
 			}
 		}
 
 		@Override
 		public int read(byte[] b, int off, int len) throws IOException {
-			if(b == null){
+			if (b == null){
 				throw new NullPointerException("Byte array b cannot be null.");
 			}
 
-			if(off < 0){
+			if (off < 0){
 				throw new IllegalArgumentException("The offset off cannot be negative.");
 			}
 
-			if(len < 0){
+			if (len < 0){
 				throw new IllegalArgumentException("The length len cannot be negative.");
 			}
 
 			int toRead = Math.min(len, remaining());
-			this.segment.get(this.position,b,off, toRead);
+			this.segment.get(this.position, b, off, toRead);
 			this.position += toRead;
 
 			return toRead;

http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index af0bfe1..f0123c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -293,7 +293,7 @@ public class EventSerializer {
 		final ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(event);
 
 		MemorySegment data = MemorySegmentFactory.wrap(serializedEvent.array());
-		
+
 		final Buffer buffer = new NetworkBuffer(data, FreeingBufferRecycler.INSTANCE, false);
 		buffer.setSize(serializedEvent.remaining());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
index dd8ea06..4f48d86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
@@ -16,21 +16,23 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.network.api.serialization;
 
-import java.io.IOException;
-
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 
+import java.io.IOException;
+
 /**
  * Interface for turning sequences of memory segments into records.
  */
 public interface RecordDeserializer<T extends IOReadableWritable> {
 
-	public static enum DeserializationResult {
+	/**
+	 * Status of the deserialization result.
+	 */
+	enum DeserializationResult {
 		PARTIAL_RECORD(false, true),
 		INTERMEDIATE_RECORD_FROM_BUFFER(true, false),
 		LAST_RECORD_FROM_BUFFER(true, true);
@@ -52,7 +54,7 @@ public interface RecordDeserializer<T extends IOReadableWritable> {
 			return this.isBufferConsumed;
 		}
 	}
-	
+
 	DeserializationResult getNextRecord(T target) throws IOException;
 
 	void setNextMemorySegment(MemorySegment segment, int numBytes) throws IOException;
@@ -62,6 +64,6 @@ public interface RecordDeserializer<T extends IOReadableWritable> {
 	Buffer getCurrentBuffer();
 
 	void clear();
-	
+
 	boolean hasUnfinishedData();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
index f09bd4a..1eefc79 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.core.io.IOReadableWritable;
@@ -30,16 +29,19 @@ import java.io.IOException;
  */
 public interface RecordSerializer<T extends IOReadableWritable> {
 
+	/**
+	 * Status of the serialization result.
+	 */
 	enum SerializationResult {
 		PARTIAL_RECORD_MEMORY_SEGMENT_FULL(false, true),
 		FULL_RECORD_MEMORY_SEGMENT_FULL(true, true),
 		FULL_RECORD(true, false);
-		
+
 		private final boolean isFullRecord;
 
 		private final boolean isFullBuffer;
-		
-		private SerializationResult(boolean isFullRecord, boolean isFullBuffer) {
+
+		SerializationResult(boolean isFullRecord, boolean isFullBuffer) {
 			this.isFullRecord = isFullRecord;
 			this.isFullBuffer = isFullBuffer;
 		}
@@ -71,7 +73,6 @@ public interface RecordSerializer<T extends IOReadableWritable> {
 	 * @param record the record to serialize
 	 * @return how much information was written to the target buffer and
 	 *         whether this buffer is full
-	 * @throws IOException
 	 */
 	SerializationResult addRecord(T record) throws IOException;
 
@@ -82,7 +83,6 @@ public interface RecordSerializer<T extends IOReadableWritable> {
 	 * @param bufferBuilder the new target buffer to use
 	 * @return how much information was written to the target buffer and
 	 *         whether this buffer is full
-	 * @throws IOException
 	 */
 	SerializationResult setNextBufferBuilder(BufferBuilder bufferBuilder) throws IOException;
 
@@ -90,7 +90,7 @@ public interface RecordSerializer<T extends IOReadableWritable> {
 	 * Retrieves the current target buffer and sets its size to the actual
 	 * number of written bytes.
 	 *
-	 * After calling this method, a new target buffer is required to continue
+	 * <p>After calling this method, a new target buffer is required to continue
 	 * writing (see {@link #setNextBufferBuilder(BufferBuilder)}).
 	 *
 	 * @return the target buffer that was used

http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index 5e6f8e2..768c43e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -35,23 +35,23 @@ import java.nio.ByteOrder;
  * data serialization buffer and copies this buffer to target buffers
  * one-by-one using {@link #setNextBufferBuilder(BufferBuilder)}.
  *
- * @param <T>
+ * @param <T> The type of the records that are serialized.
  */
 public class SpanningRecordSerializer<T extends IOReadableWritable> implements RecordSerializer<T> {
 
-	/** Flag to enable/disable checks, if buffer not set/full or pending serialization */
+	/** Flag to enable/disable checks, if buffer not set/full or pending serialization. */
 	private static final boolean CHECKED = false;
 
-	/** Intermediate data serialization */
+	/** Intermediate data serialization. */
 	private final DataOutputSerializer serializationBuffer;
 
-	/** Intermediate buffer for data serialization (wrapped from {@link #serializationBuffer}) */
+	/** Intermediate buffer for data serialization (wrapped from {@link #serializationBuffer}). */
 	private ByteBuffer dataBuffer;
 
-	/** Intermediate buffer for length serialization */
+	/** Intermediate buffer for length serialization. */
 	private final ByteBuffer lengthBuffer;
 
-	/** Current target {@link Buffer} of the serializer */
+	/** Current target {@link Buffer} of the serializer. */
 	@Nullable
 	private BufferBuilder targetBuffer;
 
@@ -73,7 +73,6 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 	 * @param record the record to serialize
 	 * @return how much information was written to the target buffer and
 	 *         whether this buffer is full
-	 * @throws IOException
 	 */
 	@Override
 	public SerializationResult addRecord(T record) throws IOException {
@@ -114,14 +113,14 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 		}
 
 		SerializationResult result = getSerializationResult();
-		
+
 		// make sure we don't hold onto the large buffers for too long
 		if (result.isFullRecord()) {
 			serializationBuffer.clear();
 			serializationBuffer.pruneBuffer();
 			dataBuffer = serializationBuffer.wrapAsByteBuffer();
 		}
-		
+
 		return result;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index 74260d4..985a93e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -19,11 +19,11 @@
 package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.util.StringUtils;
 
 import java.io.BufferedInputStream;
@@ -42,18 +42,17 @@ import java.util.Random;
  * @param <T> The type of the record to be deserialized.
  */
 public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> implements RecordDeserializer<T> {
-	
+
 	private static final String BROKEN_SERIALIZATION_ERROR_MESSAGE =
 					"Serializer consumed more bytes than the record had. " +
 					"This indicates broken serialization. If you are using custom serialization types " +
 					"(Value or Writable), check their serialization methods. If you are using a " +
 					"Kryo-serialized type, check the corresponding Kryo serializer.";
-	
+
 	private static final int THRESHOLD_FOR_SPILLING = 5 * 1024 * 1024; // 5 MiBytes
-	
-	
+
 	private final NonSpanningWrapper nonSpanningWrapper;
-	
+
 	private final SpanningWrapper spanningWrapper;
 
 	private Buffer currentBuffer;
@@ -79,7 +78,7 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 		currentBuffer = null;
 		return tmp;
 	}
-	
+
 	@Override
 	public void setNextMemorySegment(MemorySegment segment, int numBytes) throws IOException {
 		// check if some spanning record deserialization is pending
@@ -90,15 +89,15 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 			this.nonSpanningWrapper.initializeFromMemorySegment(segment, 0, numBytes);
 		}
 	}
-	
+
 	@Override
 	public DeserializationResult getNextRecord(T target) throws IOException {
 		// always check the non-spanning wrapper first.
 		// this should be the majority of the cases for small records
 		// for large records, this portion of the work is very small in comparison anyways
-		
+
 		int nonSpanningRemaining = this.nonSpanningWrapper.remaining();
-		
+
 		// check if we can get a full length;
 		if (nonSpanningRemaining >= 4) {
 			int len = this.nonSpanningWrapper.readInt();
@@ -137,17 +136,17 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 			this.nonSpanningWrapper.clear();
 			return DeserializationResult.PARTIAL_RECORD;
 		}
-		
+
 		// spanning record case
 		if (this.spanningWrapper.hasFullRecord()) {
 			// get the full record
 			target.read(this.spanningWrapper.getInputView());
-			
+
 			// move the remainder to the non-spanning wrapper
 			// this does not copy it, only sets the memory segment
 			this.spanningWrapper.moveRemainderToNonSpanningDeserializer(this.nonSpanningWrapper);
 			this.spanningWrapper.clear();
-			
+
 			return (this.nonSpanningWrapper.remaining() == 0) ?
 				DeserializationResult.LAST_RECORD_FROM_BUFFER :
 				DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
@@ -169,38 +168,38 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 
 
 	// -----------------------------------------------------------------------------------------------------------------
-	
+
 	private static final class NonSpanningWrapper implements DataInputView {
-		
+
 		private MemorySegment segment;
-		
+
 		private int limit;
-		
+
 		private int position;
-		
+
 		private byte[] utfByteBuffer; // reusable byte buffer for utf-8 decoding
 		private char[] utfCharBuffer; // reusable char buffer for utf-8 decoding
-		
+
 		int remaining() {
 			return this.limit - this.position;
 		}
-		
+
 		void clear() {
 			this.segment = null;
 			this.limit = 0;
 			this.position = 0;
 		}
-		
+
 		void initializeFromMemorySegment(MemorySegment seg, int position, int leftOverLimit) {
 			this.segment = seg;
 			this.position = position;
 			this.limit = leftOverLimit;
 		}
-		
+
 		// -------------------------------------------------------------------------------------------------------------
 		//                                       DataInput specific methods
 		// -------------------------------------------------------------------------------------------------------------
-		
+
 		@Override
 		public final void readFully(byte[] b) throws IOException {
 			readFully(b, 0, b.length);
@@ -211,7 +210,7 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 			if (off < 0 || len < 0 || off + len > b.length) {
 				throw new IndexOutOfBoundsException();
 			}
-			
+
 			this.segment.get(this.position, b, off, len);
 			this.position += len;
 		}
@@ -279,7 +278,7 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 		@Override
 		public final String readLine() throws IOException {
 			final StringBuilder bld = new StringBuilder(32);
-			
+
 			try {
 				int b;
 				while ((b = readUnsignedByte()) != '\n') {
@@ -288,12 +287,12 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 					}
 				}
 			}
-			catch (EOFException eofex) {}
+			catch (EOFException ignored) {}
 
 			if (bld.length() == 0) {
 				return null;
 			}
-			
+
 			// trim a trailing carriage return
 			int len = bld.length();
 			if (len > 0 && bld.charAt(len - 1) == '\r') {
@@ -305,10 +304,10 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 		@Override
 		public final String readUTF() throws IOException {
 			final int utflen = readUnsignedShort();
-			
+
 			final byte[] bytearr;
 			final char[] chararr;
-			
+
 			if (this.utfByteBuffer == null || this.utfByteBuffer.length < utflen) {
 				bytearr = new byte[utflen];
 				this.utfByteBuffer = bytearr;
@@ -324,7 +323,7 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 
 			int c, char2, char3;
 			int count = 0;
-			int chararr_count = 0;
+			int chararrCount = 0;
 
 			readFully(bytearr, 0, utflen);
 
@@ -334,7 +333,7 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 					break;
 				}
 				count++;
-				chararr[chararr_count++] = (char) c;
+				chararr[chararrCount++] = (char) c;
 			}
 
 			while (count < utflen) {
@@ -349,7 +348,7 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 				case 6:
 				case 7:
 					count++;
-					chararr[chararr_count++] = (char) c;
+					chararr[chararrCount++] = (char) c;
 					break;
 				case 12:
 				case 13:
@@ -361,7 +360,7 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 					if ((char2 & 0xC0) != 0x80) {
 						throw new UTFDataFormatException("malformed input around byte " + count);
 					}
-					chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
+					chararr[chararrCount++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
 					break;
 				case 14:
 					count += 3;
@@ -373,22 +372,22 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 					if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
 						throw new UTFDataFormatException("malformed input around byte " + (count - 1));
 					}
-					chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | (char3 & 0x3F));
+					chararr[chararrCount++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | (char3 & 0x3F));
 					break;
 				default:
 					throw new UTFDataFormatException("malformed input around byte " + count);
 				}
 			}
 			// The number of chars produced may be less than utflen
-			return new String(chararr, 0, chararr_count);
+			return new String(chararr, 0, chararrCount);
 		}
-		
+
 		@Override
 		public final int skipBytes(int n) throws IOException {
 			if (n < 0) {
 				throw new IllegalArgumentException();
 			}
-			
+
 			int toSkip = Math.min(n, remaining());
 			this.position += toSkip;
 			return toSkip;
@@ -398,27 +397,27 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 		public void skipBytesToRead(int numBytes) throws IOException {
 			int skippedBytes = skipBytes(numBytes);
 
-			if(skippedBytes < numBytes){
+			if (skippedBytes < numBytes){
 				throw new EOFException("Could not skip " + numBytes + " bytes.");
 			}
 		}
 
 		@Override
 		public int read(byte[] b, int off, int len) throws IOException {
-			if(b == null){
+			if (b == null){
 				throw new NullPointerException("Byte array b cannot be null.");
 			}
 
-			if(off < 0){
+			if (off < 0){
 				throw new IllegalArgumentException("The offset off cannot be negative.");
 			}
 
-			if(len < 0){
+			if (len < 0){
 				throw new IllegalArgumentException("The length len cannot be negative.");
 			}
 
 			int toRead = Math.min(len, remaining());
-			this.segment.get(this.position,b,off, toRead);
+			this.segment.get(this.position, b, off, toRead);
 			this.position += toRead;
 
 			return toRead;
@@ -431,25 +430,25 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 	}
 
 	// -----------------------------------------------------------------------------------------------------------------
-	
+
 	private static final class SpanningWrapper {
-		
+
 		private final byte[] initialBuffer = new byte[1024];
-		
+
 		private final String[] tempDirs;
-		
+
 		private final Random rnd = new Random();
 
 		private final DataInputDeserializer serializationReadBuffer;
 
 		private final ByteBuffer lengthBuffer;
-		
+
 		private FileChannel spillingChannel;
-		
+
 		private byte[] buffer;
 
 		private int recordLength;
-		
+
 		private int accumulatedRecordBytes;
 
 		private MemorySegment leftOverData;
@@ -457,14 +456,14 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 		private int leftOverStart;
 
 		private int leftOverLimit;
-		
+
 		private File spillFile;
-		
+
 		private DataInputViewStreamWrapper spillFileReader;
 
 		public SpanningWrapper(String[] tempDirs) {
 			this.tempDirs = tempDirs;
-			
+
 			this.lengthBuffer = ByteBuffer.allocate(4);
 			this.lengthBuffer.order(ByteOrder.BIG_ENDIAN);
 
@@ -473,17 +472,17 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 			this.serializationReadBuffer = new DataInputDeserializer();
 			this.buffer = initialBuffer;
 		}
-		
+
 		private void initializeWithPartialRecord(NonSpanningWrapper partial, int nextRecordLength) throws IOException {
 			// set the length and copy what is available to the buffer
 			this.recordLength = nextRecordLength;
-			
+
 			final int numBytesChunk = partial.remaining();
-			
+
 			if (nextRecordLength > THRESHOLD_FOR_SPILLING) {
 				// create a spilling channel and put the data there
 				this.spillingChannel = createSpillingChannel();
-				
+
 				ByteBuffer toWrite = partial.segment.wrap(partial.position, numBytesChunk);
 				this.spillingChannel.write(toWrite);
 			}
@@ -492,23 +491,23 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 				ensureBufferCapacity(numBytesChunk);
 				partial.segment.get(partial.position, buffer, 0, numBytesChunk);
 			}
-			
+
 			this.accumulatedRecordBytes = numBytesChunk;
 		}
-		
+
 		private void initializeWithPartialLength(NonSpanningWrapper partial) throws IOException {
 			// copy what we have to the length buffer
 			partial.segment.get(partial.position, this.lengthBuffer, partial.remaining());
 		}
-		
+
 		private void addNextChunkFromMemorySegment(MemorySegment segment, int numBytesInSegment) throws IOException {
 			int segmentPosition = 0;
-			
+
 			// check where to go. if we have a partial length, we need to complete it first
 			if (this.lengthBuffer.position() > 0) {
 				int toPut = Math.min(this.lengthBuffer.remaining(), numBytesInSegment);
 				segment.get(0, this.lengthBuffer, toPut);
-				
+
 				// did we complete the length?
 				if (this.lengthBuffer.hasRemaining()) {
 					return;
@@ -517,7 +516,7 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 
 					this.lengthBuffer.clear();
 					segmentPosition = toPut;
-					
+
 					if (this.recordLength > THRESHOLD_FOR_SPILLING) {
 						this.spillingChannel = createSpillingChannel();
 					}
@@ -538,16 +537,16 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 				ensureBufferCapacity(accumulatedRecordBytes + toCopy);
 				segment.get(segmentPosition, buffer, this.accumulatedRecordBytes, toCopy);
 			}
-			
+
 			this.accumulatedRecordBytes += toCopy;
-			
+
 			if (toCopy < available) {
 				// there is more data in the segment
 				this.leftOverData = segment;
 				this.leftOverStart = segmentPosition + toCopy;
 				this.leftOverLimit = numBytesInSegment;
 			}
-			
+
 			if (accumulatedRecordBytes == recordLength) {
 				// we have the full record
 				if (spillingChannel == null) {
@@ -561,19 +560,19 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 				}
 			}
 		}
-		
+
 		private void moveRemainderToNonSpanningDeserializer(NonSpanningWrapper deserializer) {
 			deserializer.clear();
-			
+
 			if (leftOverData != null) {
 				deserializer.initializeFromMemorySegment(leftOverData, leftOverStart, leftOverLimit);
 			}
 		}
-		
+
 		private boolean hasFullRecord() {
 			return this.recordLength >= 0 && this.accumulatedRecordBytes >= this.recordLength;
 		}
-		
+
 		private int getNumGatheredBytes() {
 			return this.accumulatedRecordBytes + (this.recordLength >= 0 ? 4 : lengthBuffer.position());
 		}
@@ -586,7 +585,7 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 			this.lengthBuffer.clear();
 			this.leftOverData = null;
 			this.accumulatedRecordBytes = 0;
-			
+
 			if (spillingChannel != null) {
 				try {
 					spillingChannel.close();
@@ -610,16 +609,16 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 				spillFile = null;
 			}
 		}
-		
+
 		public DataInputView getInputView() {
 			if (spillFileReader == null) {
-				return serializationReadBuffer; 
+				return serializationReadBuffer;
 			}
 			else {
 				return spillFileReader;
 			}
 		}
-		
+
 		private void ensureBufferCapacity(int minLength) {
 			if (buffer.length < minLength) {
 				byte[] newBuffer = new byte[Math.max(minLength, buffer.length * 2)];
@@ -627,19 +626,19 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 				buffer = newBuffer;
 			}
 		}
-		
+
 		@SuppressWarnings("resource")
 		private FileChannel createSpillingChannel() throws IOException {
 			if (spillFile != null) {
 				throw new IllegalStateException("Spilling file already exists.");
 			}
-			
+
 			String directory = tempDirs[rnd.nextInt(tempDirs.length)];
 			spillFile = new File(directory, randomString(rnd) + ".inputchannel");
-			
+
 			return new RandomAccessFile(spillFile, "rw").getChannel();
 		}
-		
+
 		private static String randomString(Random random) {
 			final byte[] bytes = new byte[20];
 			random.nextBytes(bytes);

http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 0bf12c4..001de19 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -56,10 +56,10 @@ public class RecordWriter<T extends IOReadableWritable> {
 
 	private final int numChannels;
 
-	/** {@link RecordSerializer} per outgoing channel */
+	/** {@link RecordSerializer} per outgoing channel. */
 	private final RecordSerializer<T>[] serializers;
 
-	private final Random RNG = new XORShiftRandom();
+	private final Random rng = new XORShiftRandom();
 
 	private Counter numBytesOut = new SimpleCounter();
 
@@ -74,7 +74,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 
 		this.numChannels = writer.getNumberOfSubpartitions();
 
-		/**
+		/*
 		 * The runtime exposes a channel abstraction for the produced results
 		 * (see {@link ChannelSelector}). Every channel has an independent
 		 * serializer.
@@ -102,10 +102,10 @@ public class RecordWriter<T extends IOReadableWritable> {
 	}
 
 	/**
-	 * This is used to send LatencyMarks to a random target channel
+	 * This is used to send LatencyMarks to a random target channel.
 	 */
 	public void randomEmit(T record) throws IOException, InterruptedException {
-		sendToTarget(record, RNG.nextInt(numChannels));
+		sendToTarget(record, rng.nextInt(numChannels));
 	}
 
 	private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
@@ -203,7 +203,6 @@ public class RecordWriter<T extends IOReadableWritable> {
 
 	/**
 	 * Sets the metric group for this RecordWriter.
-	 * @param metrics
      */
 	public void setMetricGroup(TaskIOMetricGroup metrics) {
 		numBytesOut = metrics.getNumBytesOutCounter();
@@ -213,7 +212,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 	 * Writes the buffer to the {@link ResultPartitionWriter} and removes the
 	 * buffer from the serializer state.
 	 *
-	 * Needs to be synchronized on the serializer!
+	 * <p><b>Needs to be synchronized on the serializer!</b>
 	 */
 	private void writeAndClearBuffer(
 			Buffer buffer,

http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java
index c7d25e5..c707d47 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.network.api.writer;
 
 import org.apache.flink.core.io.IOReadableWritable;
@@ -42,7 +41,6 @@ public class RoundRobinChannelSelector<T extends IOReadableWritable> implements
 		this.nextChannelToSendTo[0] = 0;
 	}
 
-
 	@Override
 	public int[] selectChannels(final T record, final int numberOfOutputChannels) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
index 2f481b9..2ad50dc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
@@ -18,14 +18,17 @@
 
 package org.apache.flink.runtime.io.network.api;
 
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 
 import org.junit.Test;
 
 import static org.junit.Assert.fail;
 
+/**
+ * Tests for the {@link CheckpointBarrier} type.
+ */
 public class CheckpointBarrierTest {
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
index c6c0645..d57675f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.util.event.EventListener;
+
 import org.junit.Test;
 import org.mockito.Matchers;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index 62d6aa5..de5f4a8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -18,13 +18,6 @@
 
 package org.apache.flink.runtime.io.network.api.serialization;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.event.AbstractEvent;
@@ -34,10 +27,21 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;
-
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+
 import org.junit.Test;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link EventSerializer}.
+ */
 public class EventSerializerTest {
 
 	@Test
@@ -78,12 +82,12 @@ public class EventSerializerTest {
 				new TestTaskEvent(Math.random(), 12361231273L),
 				new CancelCheckpointMarker(287087987329842L)
 		};
-		
+
 		for (AbstractEvent evt : events) {
 			ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(evt);
 			assertTrue(serializedEvent.hasRemaining());
 
-			AbstractEvent deserialized = 
+			AbstractEvent deserialized =
 					EventSerializer.fromSerializedEvent(serializedEvent, getClass().getClassLoader());
 			assertNotNull(deserialized);
 			assertEquals(evt, deserialized);
@@ -94,8 +98,6 @@ public class EventSerializerTest {
 	 * Tests {@link EventSerializer#isEvent(Buffer, Class, ClassLoader)}
 	 * whether it peaks into the buffer only, i.e. after the call, the buffer
 	 * is still de-serializable.
-	 *
-	 * @throws Exception
 	 */
 	@Test
 	public void testIsEventPeakOnly() throws Exception {
@@ -117,8 +119,6 @@ public class EventSerializerTest {
 	/**
 	 * Tests {@link EventSerializer#isEvent(Buffer, Class, ClassLoader)} returns
 	 * the correct answer for various encoded event buffers.
-	 *
-	 * @throws Exception
 	 */
 	@Test
 	public void testIsEvent() throws Exception {
@@ -151,7 +151,6 @@ public class EventSerializerTest {
 	 *
 	 * @return whether {@link EventSerializer#isEvent(ByteBuffer, Class, ClassLoader)}
 	 * 		thinks the encoded buffer matches the class
-	 * @throws IOException
 	 */
 	private boolean checkIsEvent(
 			AbstractEvent event,

http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java
index aad7ee1..3f782fb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java
@@ -20,11 +20,11 @@ package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.memory.AbstractPagedInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
 import org.apache.flink.testutils.serialization.types.SerializationTestType;
 import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
 import org.apache.flink.testutils.serialization.types.Util;
-import org.apache.flink.runtime.memory.AbstractPagedInputView;
-import org.apache.flink.runtime.memory.AbstractPagedOutputView;
 
 import org.junit.Test;
 
@@ -39,14 +39,17 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Tests for the {@link AbstractPagedInputView} and {@link AbstractPagedOutputView}.
+ */
 public class PagedViewsTest {
 
 	@Test
 	public void testSequenceOfIntegersWithAlignedBuffers() {
 		try {
-			final int NUM_INTS = 1000000;
+			final int numInts = 1000000;
 
-			testSequenceOfTypes(Util.randomRecords(NUM_INTS, SerializationTestTypeFactory.INT), 2048);
+			testSequenceOfTypes(Util.randomRecords(numInts, SerializationTestTypeFactory.INT), 2048);
 
 		} catch (Exception e) {
 			e.printStackTrace();
@@ -57,9 +60,9 @@ public class PagedViewsTest {
 	@Test
 	public void testSequenceOfIntegersWithUnalignedBuffers() {
 		try {
-			final int NUM_INTS = 1000000;
+			final int numInts = 1000000;
 
-			testSequenceOfTypes(Util.randomRecords(NUM_INTS, SerializationTestTypeFactory.INT), 2047);
+			testSequenceOfTypes(Util.randomRecords(numInts, SerializationTestTypeFactory.INT), 2047);
 
 		} catch (Exception e) {
 			e.printStackTrace();
@@ -70,10 +73,10 @@ public class PagedViewsTest {
 	@Test
 	public void testRandomTypes() {
 		try {
-			final int NUM_TYPES = 100000;
+			final int numTypes = 100000;
 
 			// test with an odd buffer size to force many unaligned cases
-			testSequenceOfTypes(Util.randomRecords(NUM_TYPES), 57);
+			testSequenceOfTypes(Util.randomRecords(numTypes), 57);
 
 		} catch (Exception e) {
 			e.printStackTrace();
@@ -91,7 +94,7 @@ public class PagedViewsTest {
 
 		try {
 			outputView.write(expected);
-		}catch(Exception e){
+		} catch (Exception e) {
 			e.printStackTrace();
 			fail("Unexpected exception: Could not write to TestOutputView.");
 		}
@@ -123,7 +126,7 @@ public class PagedViewsTest {
 
 		try {
 			outputView.write(expected);
-		}catch(Exception e){
+		} catch (Exception e) {
 			e.printStackTrace();
 			fail("Unexpected exception: Could not write to TestOutputView.");
 		}
@@ -156,7 +159,7 @@ public class PagedViewsTest {
 
 		try {
 			outputView.write(expected);
-		}catch(Exception e){
+		} catch (Exception e) {
 			e.printStackTrace();
 			fail("Unexpected exception: Could not write to TestOutputView.");
 		}
@@ -178,7 +181,7 @@ public class PagedViewsTest {
 		assertEquals(inputView.getCurrentPositionInSegment(), bytes2Write % segmentSize);
 
 		byte[] tempBuffer = new byte[bytesRead];
-		System.arraycopy(buffer,0,tempBuffer,0,bytesRead);
+		System.arraycopy(buffer, 0, tempBuffer, 0, bytesRead);
 		assertArrayEquals(expected, tempBuffer);
 	}
 
@@ -194,7 +197,7 @@ public class PagedViewsTest {
 
 		try {
 			outputView.write(expected);
-		}catch(Exception e){
+		} catch (Exception e){
 			e.printStackTrace();
 			fail("Unexpected exception: Could not write to TestOutputView.");
 		}
@@ -215,12 +218,12 @@ public class PagedViewsTest {
 		assertEquals(bytes2Write, bytesRead);
 
 		byte[] tempBuffer = new byte[bytesRead];
-		System.arraycopy(buffer,0,tempBuffer,0,bytesRead);
+		System.arraycopy(buffer, 0, tempBuffer, 0, bytesRead);
 		assertArrayEquals(expected, tempBuffer);
 
-		try{
+		try {
 			bytesRead = inputView.read(buffer);
-		}catch(IOException e){
+		} catch (IOException e){
 			e.printStackTrace();
 			fail("Unexpected exception: Input view should be empty and thus return -1.");
 		}
@@ -241,7 +244,7 @@ public class PagedViewsTest {
 
 		try {
 			outputView.write(expected);
-		}catch(Exception e){
+		} catch (Exception e) {
 			e.printStackTrace();
 			fail("Unexpected exception: Could not write to TestOutputView.");
 		}
@@ -254,7 +257,7 @@ public class PagedViewsTest {
 
 		try {
 			inputView.readFully(buffer);
-		}catch(EOFException e){
+		} catch (EOFException e) {
 			//Expected exception
 			eofException = true;
 		}
@@ -267,9 +270,9 @@ public class PagedViewsTest {
 
 		int bytesRead = 0;
 
-		try{
-			bytesRead =inputView.read(buffer);
-		}catch(Exception e){
+		try {
+			bytesRead = inputView.read(buffer);
+		} catch (Exception e) {
 			e.printStackTrace();
 			fail("Unexpected exception: Could not read TestInputView.");
 		}
@@ -288,7 +291,7 @@ public class PagedViewsTest {
 
 		try {
 			outputView.write(expected);
-		}catch(Exception e){
+		} catch (Exception e) {
 			e.printStackTrace();
 			fail("Unexpected exception: Could not write to TestOutputView.");
 		}
@@ -296,7 +299,7 @@ public class PagedViewsTest {
 		outputView.close();
 
 		TestInputView inputView = new TestInputView(outputView.segments);
-		byte[] buffer = new byte[2*bufferSize];
+		byte[] buffer = new byte[2 * bufferSize];
 
 		try {
 			inputView.readFully(buffer, bufferSize, bufferSize);
@@ -307,7 +310,7 @@ public class PagedViewsTest {
 
 		assertEquals(inputView.getCurrentPositionInSegment(), bufferSize % segmentSize);
 		byte[] tempBuffer = new byte[bufferSize];
-		System.arraycopy(buffer, bufferSize, tempBuffer,0, bufferSize);
+		System.arraycopy(buffer, bufferSize, tempBuffer, 0, bufferSize);
 		assertArrayEquals(expected, tempBuffer);
 	}
 
@@ -321,12 +324,12 @@ public class PagedViewsTest {
 		byte[] buffer = new byte[segmentSize];
 		boolean eofException = false;
 
-		try{
+		try {
 			inputView.readFully(buffer);
-		}catch(EOFException e){
+		} catch (EOFException e) {
 			//expected Exception
 			eofException = true;
-		}catch(Exception e){
+		} catch (Exception e) {
 			e.printStackTrace();
 			fail("Unexpected exception: Could not read TestInputView.");
 		}
@@ -334,10 +337,9 @@ public class PagedViewsTest {
 		assertTrue("EOFException expected.", eofException);
 	}
 
-
 	private static void testSequenceOfTypes(Iterable<SerializationTestType> sequence, int segmentSize) throws Exception {
 
-		List<SerializationTestType> elements = new ArrayList<SerializationTestType>(512);
+		List<SerializationTestType> elements = new ArrayList<>(512);
 		TestOutputView outView = new TestOutputView(segmentSize);
 
 		// write
@@ -373,7 +375,7 @@ public class PagedViewsTest {
 
 	private static final class TestOutputView extends AbstractPagedOutputView {
 
-		private final List<SegmentWithPosition> segments = new ArrayList<SegmentWithPosition>();
+		private final List<SegmentWithPosition> segments = new ArrayList<>();
 
 		private final int segmentSize;
 
@@ -400,7 +402,6 @@ public class PagedViewsTest {
 
 		private int num;
 
-
 		private TestInputView(List<SegmentWithPosition> segments) {
 			super(segments.get(0).segment, segments.get(0).position, 0);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
index ed0ce6c..ebe588c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
@@ -30,16 +30,19 @@ import java.util.ArrayDeque;
 
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
 
+/**
+ * Tests for the {@link SpillingAdaptiveSpanningRecordDeserializer} and {@link AdaptiveSpanningRecordDeserializer}.
+ */
 public class SpanningRecordSerializationTest {
 
 	@Test
 	public void testIntRecordsSpanningMultipleSegments() {
-		final int SEGMENT_SIZE = 1;
-		final int NUM_VALUES = 10;
+		final int segmentSize = 1;
+		final int numValues = 10;
 
 		try {
-			testNonSpillingDeserializer(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE);
-			testSpillingDeserializer(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE);
+			testNonSpillingDeserializer(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize);
+			testSpillingDeserializer(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -49,12 +52,12 @@ public class SpanningRecordSerializationTest {
 
 	@Test
 	public void testIntRecordsWithAlignedBuffers () {
-		final int SEGMENT_SIZE = 64;
-		final int NUM_VALUES = 64;
+		final int segmentSize = 64;
+		final int numValues = 64;
 
 		try {
-			testNonSpillingDeserializer(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE);
-			testSpillingDeserializer(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE);
+			testNonSpillingDeserializer(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize);
+			testSpillingDeserializer(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -64,12 +67,12 @@ public class SpanningRecordSerializationTest {
 
 	@Test
 	public void testIntRecordsWithUnalignedBuffers () {
-		final int SEGMENT_SIZE = 31;
-		final int NUM_VALUES = 248;
+		final int segmentSize = 31;
+		final int numValues = 248;
 
 		try {
-			testNonSpillingDeserializer(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE);
-			testSpillingDeserializer(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE);
+			testNonSpillingDeserializer(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize);
+			testSpillingDeserializer(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -78,13 +81,13 @@ public class SpanningRecordSerializationTest {
 	}
 
 	@Test
-	 public void testRandomRecords () {
-		final int SEGMENT_SIZE = 127;
-		final int NUM_VALUES = 10000;
+	public void testRandomRecords () {
+		final int segmentSize = 127;
+		final int numValues = 10000;
 
 		try {
-			testNonSpillingDeserializer(Util.randomRecords(NUM_VALUES), SEGMENT_SIZE);
-			testSpillingDeserializer(Util.randomRecords(NUM_VALUES), SEGMENT_SIZE);
+			testNonSpillingDeserializer(Util.randomRecords(numValues), segmentSize);
+			testSpillingDeserializer(Util.randomRecords(numValues), segmentSize);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -95,38 +98,37 @@ public class SpanningRecordSerializationTest {
 	// -----------------------------------------------------------------------------------------------------------------
 
 	private void testNonSpillingDeserializer(Util.MockRecords records, int segmentSize) throws Exception {
-		RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
-		RecordDeserializer<SerializationTestType> deserializer = new AdaptiveSpanningRecordDeserializer<SerializationTestType>();
-		
+		RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
+		RecordDeserializer<SerializationTestType> deserializer = new AdaptiveSpanningRecordDeserializer<>();
+
 		test(records, segmentSize, serializer, deserializer);
 	}
-	
+
 	private void testSpillingDeserializer(Util.MockRecords records, int segmentSize) throws Exception {
-		RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
-		RecordDeserializer<SerializationTestType> deserializer = 
-				new SpillingAdaptiveSpanningRecordDeserializer<SerializationTestType>(
+		RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
+		RecordDeserializer<SerializationTestType> deserializer =
+				new SpillingAdaptiveSpanningRecordDeserializer<>(
 						new String[] { System.getProperty("java.io.tmpdir") });
-		
+
 		test(records, segmentSize, serializer, deserializer);
 	}
-	
+
 	/**
 	 * Iterates over the provided records and tests whether {@link SpanningRecordSerializer} and {@link AdaptiveSpanningRecordDeserializer}
 	 * interact as expected.
-	 * <p>
-	 * Only a single {@link MemorySegment} will be allocated.
+	 *
+	 * <p>Only a single {@link MemorySegment} will be allocated.
 	 *
 	 * @param records records to test
 	 * @param segmentSize size for the {@link MemorySegment}
 	 */
-	private void test(Util.MockRecords records, int segmentSize, 
+	private void test(Util.MockRecords records, int segmentSize,
 			RecordSerializer<SerializationTestType> serializer,
-			RecordDeserializer<SerializationTestType> deserializer)
-		throws Exception
-	{
-		final int SERIALIZATION_OVERHEAD = 4; // length encoding
+			RecordDeserializer<SerializationTestType> deserializer) throws Exception {
 
-		final ArrayDeque<SerializationTestType> serializedRecords = new ArrayDeque<SerializationTestType>();
+		final int serializationOverhead = 4; // length encoding
+
+		final ArrayDeque<SerializationTestType> serializedRecords = new ArrayDeque<>();
 
 		// -------------------------------------------------------------------------------------------------------------
 
@@ -139,7 +141,7 @@ public class SpanningRecordSerializationTest {
 			serializedRecords.add(record);
 
 			numRecords++;
-			numBytes += record.length() + SERIALIZATION_OVERHEAD;
+			numBytes += record.length() + serializationOverhead;
 
 			// serialize record
 			if (serializer.addRecord(record).isFullBuffer()) {
@@ -162,9 +164,6 @@ public class SpanningRecordSerializationTest {
 				while (serializer.setNextBufferBuilder(createBufferBuilder(segmentSize)).isFullBuffer()) {
 					deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(), segmentSize);
 				}
-
-
-
 			}
 		}
 
@@ -184,7 +183,6 @@ public class SpanningRecordSerializationTest {
 			numRecords--;
 		}
 
-
 		// assert that all records have been serialized and deserialized
 		Assert.assertEquals(0, numRecords);
 		Assert.assertFalse(serializer.hasData());

http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
index ed6677e..955fc39 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
@@ -33,13 +33,16 @@ import java.util.Random;
 
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
 
+/**
+ * Tests for the {@link SpanningRecordSerializer}.
+ */
 public class SpanningRecordSerializerTest {
 
 	@Test
 	public void testHasData() {
-		final int SEGMENT_SIZE = 16;
+		final int segmentSize = 16;
 
-		final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
+		final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
 		final SerializationTestType randomIntRecord = Util.randomRecord(SerializationTestTypeFactory.INT);
 
 		Assert.assertFalse(serializer.hasData());
@@ -48,13 +51,13 @@ public class SpanningRecordSerializerTest {
 			serializer.addRecord(randomIntRecord);
 			Assert.assertTrue(serializer.hasData());
 
-			serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE));
+			serializer.setNextBufferBuilder(createBufferBuilder(segmentSize));
 			Assert.assertTrue(serializer.hasData());
 
 			serializer.clear();
 			Assert.assertFalse(serializer.hasData());
 
-			serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE));
+			serializer.setNextBufferBuilder(createBufferBuilder(segmentSize));
 
 			serializer.addRecord(randomIntRecord);
 			Assert.assertTrue(serializer.hasData());
@@ -70,16 +73,17 @@ public class SpanningRecordSerializerTest {
 
 	@Test
 	public void testEmptyRecords() {
-		final int SEGMENT_SIZE = 11;
+		final int segmentSize = 11;
 
-		final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
+		final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
 
 		try {
 			Assert.assertEquals(
 				RecordSerializer.SerializationResult.FULL_RECORD,
-				serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE)));
+				serializer.setNextBufferBuilder(createBufferBuilder(segmentSize)));
 		} catch (IOException e) {
 			e.printStackTrace();
+			Assert.fail(e.getMessage());
 		}
 
 		try {
@@ -120,7 +124,7 @@ public class SpanningRecordSerializerTest {
 			result = serializer.addRecord(emptyRecord);
 			Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, result);
 
-			result = serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE));
+			result = serializer.setNextBufferBuilder(createBufferBuilder(segmentSize));
 			Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
 		}
 		catch (Exception e) {
@@ -131,11 +135,11 @@ public class SpanningRecordSerializerTest {
 
 	@Test
 	public void testIntRecordsSpanningMultipleSegments() {
-		final int SEGMENT_SIZE = 1;
-		final int NUM_VALUES = 10;
+		final int segmentSize = 1;
+		final int numValues = 10;
 
 		try {
-			test(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE);
+			test(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -145,11 +149,11 @@ public class SpanningRecordSerializerTest {
 
 	@Test
 	public void testIntRecordsWithAlignedSegments() {
-		final int SEGMENT_SIZE = 64;
-		final int NUM_VALUES = 64;
+		final int segmentSize = 64;
+		final int numValues = 64;
 
 		try {
-			test(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE);
+			test(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -159,11 +163,11 @@ public class SpanningRecordSerializerTest {
 
 	@Test
 	public void testIntRecordsWithUnalignedSegments() {
-		final int SEGMENT_SIZE = 31;
-		final int NUM_VALUES = 248; // least common multiple => last record should align
+		final int segmentSize = 31;
+		final int numValues = 248; // least common multiple => last record should align
 
 		try {
-			test(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE);
+			test(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -173,11 +177,11 @@ public class SpanningRecordSerializerTest {
 
 	@Test
 	public void testRandomRecords() {
-		final int SEGMENT_SIZE = 127;
-		final int NUM_VALUES = 100000;
+		final int segmentSize = 127;
+		final int numValues = 100000;
 
 		try {
-			test(Util.randomRecords(NUM_VALUES), SEGMENT_SIZE);
+			test(Util.randomRecords(numValues), segmentSize);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -190,16 +194,16 @@ public class SpanningRecordSerializerTest {
 	/**
 	 * Iterates over the provided records and tests whether the {@link SpanningRecordSerializer} returns the expected
 	 * {@link RecordSerializer.SerializationResult} values.
-	 * <p>
-	 * Only a single {@link MemorySegment} will be allocated.
+	 *
+	 * <p>Only a single {@link MemorySegment} will be allocated.
 	 *
 	 * @param records records to test
 	 * @param segmentSize size for the {@link MemorySegment}
 	 */
 	private void test(Util.MockRecords records, int segmentSize) throws Exception {
-		final int SERIALIZATION_OVERHEAD = 4; // length encoding
+		final int serializationOverhead = 4; // length encoding
 
-		final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
+		final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
 
 		// -------------------------------------------------------------------------------------------------------------
 
@@ -208,7 +212,7 @@ public class SpanningRecordSerializerTest {
 		int numBytes = 0;
 		for (SerializationTestType record : records) {
 			RecordSerializer.SerializationResult result = serializer.addRecord(record);
-			numBytes += record.length() + SERIALIZATION_OVERHEAD;
+			numBytes += record.length() + serializationOverhead;
 
 			if (numBytes < segmentSize) {
 				Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);

http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index e04809c..4fac5b5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -35,8 +35,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
-import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
@@ -80,6 +78,9 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+/**
+ * Tests for the {@link RecordWriter}.
+ */
 @PrepareForTest({EventSerializer.class})
 @RunWith(PowerMockRunner.class)
 public class RecordWriterTest {
@@ -311,7 +312,7 @@ public class RecordWriterTest {
 		ResultPartitionWriter partitionWriter =
 			spy(new RecyclingPartitionWriter(new TestPooledBufferProvider(1, 16)));
 
-		RecordWriter<IntValue> recordWriter = new RecordWriter<IntValue>(partitionWriter);
+		RecordWriter<IntValue> recordWriter = new RecordWriter<>(partitionWriter);
 
 		// Fill a buffer, but don't write it out.
 		recordWriter.emit(new IntValue(0));
@@ -432,8 +433,6 @@ public class RecordWriterTest {
 	/**
 	 * Tests that event buffers are properly recycled when broadcasting events
 	 * to multiple channels.
-	 *
-	 * @throws Exception
 	 */
 	@Test
 	public void testBroadcastEventBufferReferenceCounting() throws Exception {
@@ -493,6 +492,7 @@ public class RecordWriterTest {
 		buffer1.setReaderIndex(1);
 		assertEquals("Buffer 2 shares the same reader index as buffer 1", 0, buffer2.getReaderIndex());
 	}
+
 	/**
 	 * Tests that broadcasted records' buffers are independent (in their (reader) indices) once they
 	 * are put into the queue for Netty when broadcasting events to multiple channels.
@@ -527,36 +527,6 @@ public class RecordWriterTest {
 	// Helpers
 	// ---------------------------------------------------------------------------------------------
 
-	private BufferProvider createBufferProvider(final int bufferSize)
-			throws IOException, InterruptedException {
-
-		BufferProvider bufferProvider = mock(BufferProvider.class);
-		when(bufferProvider.requestBufferBlocking()).thenAnswer(
-				new Answer<Buffer>() {
-					@Override
-					public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
-						MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(bufferSize);
-						Buffer buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE);
-						return buffer;
-					}
-				}
-		);
-
-		return bufferProvider;
-	}
-
-	private BufferProvider createBufferProvider(Buffer... buffers)
-			throws IOException, InterruptedException {
-
-		BufferProvider bufferProvider = mock(BufferProvider.class);
-
-		for (int i = 0; i < buffers.length; i++) {
-			when(bufferProvider.requestBufferBlocking()).thenReturn(buffers[i]);
-		}
-
-		return bufferProvider;
-	}
-
 	/**
 	 * Partition writer that collects the added buffers/events in multiple queue.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/tools/maven/suppressions-runtime.xml
----------------------------------------------------------------------
diff --git a/tools/maven/suppressions-runtime.xml b/tools/maven/suppressions-runtime.xml
index 9d9cb87..33a92e3 100644
--- a/tools/maven/suppressions-runtime.xml
+++ b/tools/maven/suppressions-runtime.xml
@@ -87,11 +87,11 @@ under the License.
 		files="(.*)test[/\\](.*)runtime[/\\]io[/\\](async|disk)[/\\](.*)"
 		checks="AvoidStarImport|UnusedImports"/>
 	<suppress
-		files="(.*)runtime[/\\]io[/\\]network[/\\](api|buffer|netty|partition|serialization|util)[/\\](.*)"
+		files="(.*)runtime[/\\]io[/\\]network[/\\](buffer|netty|partition|serialization|util)[/\\](.*)"
 		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
 	<!--Only additional checks for test sources. Those checks were present in the "pre-strict" checkstyle but were not applied to test sources. We do not want to suppress them for sources directory-->
 	<suppress
-		files="(.*)test[/\\](.*)runtime[/\\]io[/\\]network[/\\](api|buffer|netty|partition|serialization|util)[/\\](.*)"
+		files="(.*)test[/\\](.*)runtime[/\\]io[/\\]network[/\\](buffer|netty|partition|serialization|util)[/\\](.*)"
 		checks="AvoidStarImport|UnusedImports"/>
 	<!--Test class copied from the netty project-->
 	<suppress


Mime
View raw message