From commits-return-15563-archive-asf-public=cust-asf.ponee.io@flink.apache.org Thu Feb 1 16:46:39 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 3AEE41807A5 for ; Thu, 1 Feb 2018 16:46:38 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 2B79E160C26; Thu, 1 Feb 2018 15:46:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E5ED8160C58 for ; Thu, 1 Feb 2018 16:46:35 +0100 (CET) Received: (qmail 52453 invoked by uid 500); 1 Feb 2018 15:46:34 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 52152 invoked by uid 99); 1 Feb 2018 15:46:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Feb 2018 15:46:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9BBF0F4DCB; Thu, 1 Feb 2018 15:46:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Thu, 01 Feb 2018 15:46:52 -0000 Message-Id: In-Reply-To: <23e4608084414ba98876bf825e34196a@git.apache.org> References: <23e4608084414ba98876bf825e34196a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [20/22] flink git commit: [hotfix] [runtime] Fix checkstyle for 'runtime/io/network/api'. [hotfix] [runtime] Fix checkstyle for 'runtime/io/network/api'. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b4675f2a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b4675f2a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b4675f2a Branch: refs/heads/master Commit: b4675f2a3aa1b5a1635923b8af7dbea88902154a Parents: 4e481a7 Author: Stephan Ewen Authored: Fri Jan 26 10:42:34 2018 +0100 Committer: Stephan Ewen Committed: Thu Feb 1 13:54:56 2018 +0100 ---------------------------------------------------------------------- .../io/network/api/CancelCheckpointMarker.java | 8 +- .../io/network/api/CheckpointBarrier.java | 18 +-- .../io/network/api/EndOfPartitionEvent.java | 10 +- .../io/network/api/EndOfSuperstepEvent.java | 10 +- .../io/network/api/TaskEventHandler.java | 2 +- .../io/network/api/reader/AbstractReader.java | 7 +- .../api/reader/AbstractRecordReader.java | 4 +- .../io/network/api/reader/MutableReader.java | 4 +- .../network/api/reader/MutableRecordReader.java | 7 +- .../runtime/io/network/api/reader/Reader.java | 4 +- .../io/network/api/reader/ReaderBase.java | 4 +- .../io/network/api/reader/RecordReader.java | 6 +- .../AdaptiveSpanningRecordDeserializer.java | 31 ++-- .../api/serialization/EventSerializer.java | 2 +- .../api/serialization/RecordDeserializer.java | 14 +- .../api/serialization/RecordSerializer.java | 14 +- .../serialization/SpanningRecordSerializer.java | 17 +-- ...llingAdaptiveSpanningRecordDeserializer.java | 153 +++++++++---------- .../io/network/api/writer/RecordWriter.java | 13 +- .../api/writer/RoundRobinChannelSelector.java | 2 - .../io/network/api/CheckpointBarrierTest.java | 5 +- .../network/api/reader/AbstractReaderTest.java | 1 + .../api/serialization/EventSerializerTest.java | 29 ++-- .../api/serialization/PagedViewsTest.java | 63 ++++---- .../SpanningRecordSerializationTest.java | 76 +++++---- .../SpanningRecordSerializerTest.java | 54 ++++--- .../io/network/api/writer/RecordWriterTest.java | 40 +---- tools/maven/suppressions-runtime.xml | 4 +- 28 files changed, 289 insertions(+), 313 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java index 52a2517..207df52 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java @@ -31,7 +31,7 @@ import java.io.IOException; */ public class CancelCheckpointMarker extends RuntimeEvent { - /** The id of the checkpoint to be canceled */ + /** The id of the checkpoint to be canceled. */ private final long checkpointId; public CancelCheckpointMarker(long checkpointId) { @@ -44,7 +44,7 @@ public class CancelCheckpointMarker extends RuntimeEvent { // ------------------------------------------------------------------------ // These known and common event go through special code paths, rather than - // through generic serialization + // through generic serialization. @Override public void write(DataOutputView out) throws IOException { @@ -55,7 +55,7 @@ public class CancelCheckpointMarker extends RuntimeEvent { public void read(DataInputView in) throws IOException { throw new UnsupportedOperationException("this method should never be called"); } - + // ------------------------------------------------------------------------ @Override @@ -65,7 +65,7 @@ public class CancelCheckpointMarker extends RuntimeEvent { @Override public boolean equals(Object other) { - return other != null && + return other != null && other.getClass() == CancelCheckpointMarker.class && this.checkpointId == ((CancelCheckpointMarker) other).checkpointId; } http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java index 97ad90f..78d6707 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java @@ -18,28 +18,28 @@ package org.apache.flink.runtime.io.network.api; -import static org.apache.flink.util.Preconditions.checkNotNull; - -import java.io.IOException; - import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.event.RuntimeEvent; +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * Checkpoint barriers are used to align checkpoints throughout the streaming topology. The * barriers are emitted by the sources when instructed to do so by the JobManager. When - * operators receive a CheckpointBarrier on one of its inputs, it knows that this is the point + * operators receive a CheckpointBarrier on one of its inputs, it knows that this is the point * between the pre-checkpoint and post-checkpoint data. - * + * *

Once an operator has received a checkpoint barrier from all its input channels, it * knows that a certain checkpoint is complete. It can trigger the operator specific checkpoint * behavior and broadcast the barrier to downstream operators. - * + * *

Depending on the semantic guarantees, may hold off post-checkpoint data until the checkpoint * is complete (exactly once). - * + * *

The checkpoint barrier IDs are strictly monotonous increasing. */ public class CheckpointBarrier extends RuntimeEvent { @@ -75,7 +75,7 @@ public class CheckpointBarrier extends RuntimeEvent { // but would require the CheckpointBarrier to be mutable. Since all serialization // for events goes through the EventSerializer class, which has special serialization // for the CheckpointBarrier, we don't need these methods - // + // @Override public void write(DataOutputView out) throws IOException { http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java index 293287b..3422341 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java @@ -27,16 +27,16 @@ import org.apache.flink.runtime.event.RuntimeEvent; */ public class EndOfPartitionEvent extends RuntimeEvent { - /** The singleton instance of this event */ + /** The singleton instance of this event. */ public static final EndOfPartitionEvent INSTANCE = new EndOfPartitionEvent(); - + // ------------------------------------------------------------------------ // not instantiable private EndOfPartitionEvent() {} - + // ------------------------------------------------------------------------ - + @Override public void read(DataInputView in) { // Nothing to do here @@ -48,7 +48,7 @@ public class EndOfPartitionEvent extends RuntimeEvent { } // ------------------------------------------------------------------------ - + @Override public int hashCode() { return 1965146673; http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java index 7f51187..0603e32 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java @@ -27,16 +27,16 @@ import org.apache.flink.runtime.event.RuntimeEvent; */ public class EndOfSuperstepEvent extends RuntimeEvent { - /** The singleton instance of this event */ + /** The singleton instance of this event. */ public static final EndOfSuperstepEvent INSTANCE = new EndOfSuperstepEvent(); // ------------------------------------------------------------------------ - - // not instantiable + + /** This class is not meant to be instantiated. */ private EndOfSuperstepEvent() {} - + // ------------------------------------------------------------------------ - + @Override public void write(DataOutputView out) {} http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/TaskEventHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/TaskEventHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/TaskEventHandler.java index 4121587..16ca5a6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/TaskEventHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/TaskEventHandler.java @@ -30,7 +30,7 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Multimap; */ public class TaskEventHandler { - /** Listeners for each event type */ + /** Listeners for each event type. */ private final Multimap, EventListener> listeners = HashMultimap.create(); public void subscribe(EventListener listener, Class eventType) { http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java index 3a343bf..aaa21e7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractReader.java @@ -110,7 +110,7 @@ public abstract class AbstractReader implements ReaderBase { throw new IOException("Error while handling event of type " + eventType + ": " + t.getMessage(), t); } } - + public void publish(TaskEvent event){ taskEventHandler.publish(event); } @@ -134,11 +134,8 @@ public abstract class AbstractReader implements ReaderBase { @Override public boolean hasReachedEndOfSuperstep() { - if (isIterative) { - return currentNumberOfEndOfSuperstepEvents == inputGate.getNumberOfInputChannels(); - } + return isIterative && currentNumberOfEndOfSuperstepEvents == inputGate.getNumberOfInputChannels(); - return false; } private boolean incrementEndOfSuperstepEventAndCheck() { http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java index 1ac5f75..e3c8484 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java @@ -30,8 +30,8 @@ import java.io.IOException; /** * A record-oriented reader. - *

- * This abstract base class is used by both the mutable and immutable record readers. + * + *

This abstract base class is used by both the mutable and immutable record readers. * * @param The type of the record that can be read with this record reader. */ http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableReader.java index e47982e..a03fc4c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableReader.java @@ -18,10 +18,10 @@ package org.apache.flink.runtime.io.network.api.reader; -import java.io.IOException; - import org.apache.flink.core.io.IOReadableWritable; +import java.io.IOException; + /** * A record-oriented reader for mutable record types. */ http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java index 9836ba4..e5242e6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java @@ -23,12 +23,17 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import java.io.IOException; +/** + * Implementation of the record-oriented reader for mutable record types. + * + * @param The type of the record that is read. + */ public class MutableRecordReader extends AbstractRecordReader implements MutableReader { /** * Creates a new MutableRecordReader that de-serializes records from the given input gate and * can spill partial records to disk, if they grow large. - * + * * @param inputGate The input gate to read from. * @param tmpDirectories The temp directories. USed for spilling if the reader concurrently * reconstructs multiple large records. http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/Reader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/Reader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/Reader.java index 7255390..8295980 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/Reader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/Reader.java @@ -18,10 +18,10 @@ package org.apache.flink.runtime.io.network.api.reader; -import java.io.IOException; - import org.apache.flink.core.io.IOReadableWritable; +import java.io.IOException; + /** * A record-oriented reader for immutable record types. */ http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java index 0cc77f0..6671012 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java @@ -18,11 +18,11 @@ package org.apache.flink.runtime.io.network.api.reader; -import java.io.IOException; - import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.util.event.EventListener; +import java.io.IOException; + /** * The basic API for every reader. */ http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java index 9eed374..4bcf8be 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java @@ -23,6 +23,11 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import java.io.IOException; +/** + * Record oriented reader for immutable types. + * + * @param Thy type of the records that is read. + */ public class RecordReader extends AbstractRecordReader implements Reader { private final Class recordType; @@ -85,5 +90,4 @@ public class RecordReader extends AbstractRecordRe throw new RuntimeException("Cannot instantiate class " + recordType.getName(), e); } } - } http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java index 04a7a21..598216a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java @@ -19,11 +19,11 @@ package org.apache.flink.runtime.io.network.api.serialization; import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.core.memory.DataInputDeserializer; -import org.apache.flink.core.memory.DataOutputSerializer; import java.io.EOFException; import java.io.IOException; @@ -34,9 +34,6 @@ import java.nio.ByteOrder; /** * @param The type of the record to be deserialized. */ -/** - * @param The type of the record to be deserialized. - */ public class AdaptiveSpanningRecordDeserializer implements RecordDeserializer { private final NonSpanningWrapper nonSpanningWrapper; @@ -264,7 +261,7 @@ public class AdaptiveSpanningRecordDeserializer im } } } - catch (EOFException eofex) {} + catch (EOFException ignored) {} if (bld.length() == 0) { return null; @@ -300,7 +297,7 @@ public class AdaptiveSpanningRecordDeserializer im int c, char2, char3; int count = 0; - int chararr_count = 0; + int chararrCount = 0; readFully(bytearr, 0, utflen); @@ -310,7 +307,7 @@ public class AdaptiveSpanningRecordDeserializer im break; } count++; - chararr[chararr_count++] = (char) c; + chararr[chararrCount++] = (char) c; } while (count < utflen) { @@ -325,7 +322,7 @@ public class AdaptiveSpanningRecordDeserializer im case 6: case 7: count++; - chararr[chararr_count++] = (char) c; + chararr[chararrCount++] = (char) c; break; case 12: case 13: @@ -337,7 +334,7 @@ public class AdaptiveSpanningRecordDeserializer im if ((char2 & 0xC0) != 0x80) { throw new UTFDataFormatException("malformed input around byte " + count); } - chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F)); + chararr[chararrCount++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F)); break; case 14: count += 3; @@ -349,14 +346,14 @@ public class AdaptiveSpanningRecordDeserializer im if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) { throw new UTFDataFormatException("malformed input around byte " + (count - 1)); } - chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0)); + chararr[chararrCount++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F))); break; default: throw new UTFDataFormatException("malformed input around byte " + count); } } // The number of chars produced may be less than utflen - return new String(chararr, 0, chararr_count); + return new String(chararr, 0, chararrCount); } @Override @@ -374,27 +371,27 @@ public class AdaptiveSpanningRecordDeserializer im public void skipBytesToRead(int numBytes) throws IOException { int skippedBytes = skipBytes(numBytes); - if(skippedBytes < numBytes){ + if (skippedBytes < numBytes){ throw new EOFException("Could not skip " + numBytes + " bytes."); } } @Override public int read(byte[] b, int off, int len) throws IOException { - if(b == null){ + if (b == null){ throw new NullPointerException("Byte array b cannot be null."); } - if(off < 0){ + if (off < 0){ throw new IllegalArgumentException("The offset off cannot be negative."); } - if(len < 0){ + if (len < 0){ throw new IllegalArgumentException("The length len cannot be negative."); } int toRead = Math.min(len, remaining()); - this.segment.get(this.position,b,off, toRead); + this.segment.get(this.position, b, off, toRead); this.position += toRead; return toRead; http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java index af0bfe1..f0123c8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java @@ -293,7 +293,7 @@ public class EventSerializer { final ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(event); MemorySegment data = MemorySegmentFactory.wrap(serializedEvent.array()); - + final Buffer buffer = new NetworkBuffer(data, FreeingBufferRecycler.INSTANCE, false); buffer.setSize(serializedEvent.remaining()); http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java index dd8ea06..4f48d86 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java @@ -16,21 +16,23 @@ * limitations under the License. */ - package org.apache.flink.runtime.io.network.api.serialization; -import java.io.IOException; - import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.network.buffer.Buffer; +import java.io.IOException; + /** * Interface for turning sequences of memory segments into records. */ public interface RecordDeserializer { - public static enum DeserializationResult { + /** + * Status of the deserialization result. + */ + enum DeserializationResult { PARTIAL_RECORD(false, true), INTERMEDIATE_RECORD_FROM_BUFFER(true, false), LAST_RECORD_FROM_BUFFER(true, true); @@ -52,7 +54,7 @@ public interface RecordDeserializer { return this.isBufferConsumed; } } - + DeserializationResult getNextRecord(T target) throws IOException; void setNextMemorySegment(MemorySegment segment, int numBytes) throws IOException; @@ -62,6 +64,6 @@ public interface RecordDeserializer { Buffer getCurrentBuffer(); void clear(); - + boolean hasUnfinishedData(); } http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java index f09bd4a..1eefc79 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.runtime.io.network.api.serialization; import org.apache.flink.core.io.IOReadableWritable; @@ -30,16 +29,19 @@ import java.io.IOException; */ public interface RecordSerializer { + /** + * Status of the serialization result. + */ enum SerializationResult { PARTIAL_RECORD_MEMORY_SEGMENT_FULL(false, true), FULL_RECORD_MEMORY_SEGMENT_FULL(true, true), FULL_RECORD(true, false); - + private final boolean isFullRecord; private final boolean isFullBuffer; - - private SerializationResult(boolean isFullRecord, boolean isFullBuffer) { + + SerializationResult(boolean isFullRecord, boolean isFullBuffer) { this.isFullRecord = isFullRecord; this.isFullBuffer = isFullBuffer; } @@ -71,7 +73,6 @@ public interface RecordSerializer { * @param record the record to serialize * @return how much information was written to the target buffer and * whether this buffer is full - * @throws IOException */ SerializationResult addRecord(T record) throws IOException; @@ -82,7 +83,6 @@ public interface RecordSerializer { * @param bufferBuilder the new target buffer to use * @return how much information was written to the target buffer and * whether this buffer is full - * @throws IOException */ SerializationResult setNextBufferBuilder(BufferBuilder bufferBuilder) throws IOException; @@ -90,7 +90,7 @@ public interface RecordSerializer { * Retrieves the current target buffer and sets its size to the actual * number of written bytes. * - * After calling this method, a new target buffer is required to continue + *

After calling this method, a new target buffer is required to continue * writing (see {@link #setNextBufferBuilder(BufferBuilder)}). * * @return the target buffer that was used http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java index 5e6f8e2..768c43e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java @@ -35,23 +35,23 @@ import java.nio.ByteOrder; * data serialization buffer and copies this buffer to target buffers * one-by-one using {@link #setNextBufferBuilder(BufferBuilder)}. * - * @param + * @param The type of the records that are serialized. */ public class SpanningRecordSerializer implements RecordSerializer { - /** Flag to enable/disable checks, if buffer not set/full or pending serialization */ + /** Flag to enable/disable checks, if buffer not set/full or pending serialization. */ private static final boolean CHECKED = false; - /** Intermediate data serialization */ + /** Intermediate data serialization. */ private final DataOutputSerializer serializationBuffer; - /** Intermediate buffer for data serialization (wrapped from {@link #serializationBuffer}) */ + /** Intermediate buffer for data serialization (wrapped from {@link #serializationBuffer}). */ private ByteBuffer dataBuffer; - /** Intermediate buffer for length serialization */ + /** Intermediate buffer for length serialization. */ private final ByteBuffer lengthBuffer; - /** Current target {@link Buffer} of the serializer */ + /** Current target {@link Buffer} of the serializer. */ @Nullable private BufferBuilder targetBuffer; @@ -73,7 +73,6 @@ public class SpanningRecordSerializer implements R * @param record the record to serialize * @return how much information was written to the target buffer and * whether this buffer is full - * @throws IOException */ @Override public SerializationResult addRecord(T record) throws IOException { @@ -114,14 +113,14 @@ public class SpanningRecordSerializer implements R } SerializationResult result = getSerializationResult(); - + // make sure we don't hold onto the large buffers for too long if (result.isFullRecord()) { serializationBuffer.clear(); serializationBuffer.pruneBuffer(); dataBuffer = serializationBuffer.wrapAsByteBuffer(); } - + return result; } http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java index 74260d4..985a93e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java @@ -19,11 +19,11 @@ package org.apache.flink.runtime.io.network.api.serialization; import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.util.StringUtils; import java.io.BufferedInputStream; @@ -42,18 +42,17 @@ import java.util.Random; * @param The type of the record to be deserialized. */ public class SpillingAdaptiveSpanningRecordDeserializer implements RecordDeserializer { - + private static final String BROKEN_SERIALIZATION_ERROR_MESSAGE = "Serializer consumed more bytes than the record had. " + "This indicates broken serialization. If you are using custom serialization types " + "(Value or Writable), check their serialization methods. If you are using a " + "Kryo-serialized type, check the corresponding Kryo serializer."; - + private static final int THRESHOLD_FOR_SPILLING = 5 * 1024 * 1024; // 5 MiBytes - - + private final NonSpanningWrapper nonSpanningWrapper; - + private final SpanningWrapper spanningWrapper; private Buffer currentBuffer; @@ -79,7 +78,7 @@ public class SpillingAdaptiveSpanningRecordDeserializer= 4) { int len = this.nonSpanningWrapper.readInt(); @@ -137,17 +136,17 @@ public class SpillingAdaptiveSpanningRecordDeserializer b.length) { throw new IndexOutOfBoundsException(); } - + this.segment.get(this.position, b, off, len); this.position += len; } @@ -279,7 +278,7 @@ public class SpillingAdaptiveSpanningRecordDeserializer 0 && bld.charAt(len - 1) == '\r') { @@ -305,10 +304,10 @@ public class SpillingAdaptiveSpanningRecordDeserializer THRESHOLD_FOR_SPILLING) { // create a spilling channel and put the data there this.spillingChannel = createSpillingChannel(); - + ByteBuffer toWrite = partial.segment.wrap(partial.position, numBytesChunk); this.spillingChannel.write(toWrite); } @@ -492,23 +491,23 @@ public class SpillingAdaptiveSpanningRecordDeserializer 0) { int toPut = Math.min(this.lengthBuffer.remaining(), numBytesInSegment); segment.get(0, this.lengthBuffer, toPut); - + // did we complete the length? if (this.lengthBuffer.hasRemaining()) { return; @@ -517,7 +516,7 @@ public class SpillingAdaptiveSpanningRecordDeserializer THRESHOLD_FOR_SPILLING) { this.spillingChannel = createSpillingChannel(); } @@ -538,16 +537,16 @@ public class SpillingAdaptiveSpanningRecordDeserializer= 0 && this.accumulatedRecordBytes >= this.recordLength; } - + private int getNumGatheredBytes() { return this.accumulatedRecordBytes + (this.recordLength >= 0 ? 4 : lengthBuffer.position()); } @@ -586,7 +585,7 @@ public class SpillingAdaptiveSpanningRecordDeserializer { private final int numChannels; - /** {@link RecordSerializer} per outgoing channel */ + /** {@link RecordSerializer} per outgoing channel. */ private final RecordSerializer[] serializers; - private final Random RNG = new XORShiftRandom(); + private final Random rng = new XORShiftRandom(); private Counter numBytesOut = new SimpleCounter(); @@ -74,7 +74,7 @@ public class RecordWriter { this.numChannels = writer.getNumberOfSubpartitions(); - /** + /* * The runtime exposes a channel abstraction for the produced results * (see {@link ChannelSelector}). Every channel has an independent * serializer. @@ -102,10 +102,10 @@ public class RecordWriter { } /** - * This is used to send LatencyMarks to a random target channel + * This is used to send LatencyMarks to a random target channel. */ public void randomEmit(T record) throws IOException, InterruptedException { - sendToTarget(record, RNG.nextInt(numChannels)); + sendToTarget(record, rng.nextInt(numChannels)); } private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { @@ -203,7 +203,6 @@ public class RecordWriter { /** * Sets the metric group for this RecordWriter. - * @param metrics */ public void setMetricGroup(TaskIOMetricGroup metrics) { numBytesOut = metrics.getNumBytesOutCounter(); @@ -213,7 +212,7 @@ public class RecordWriter { * Writes the buffer to the {@link ResultPartitionWriter} and removes the * buffer from the serializer state. * - * Needs to be synchronized on the serializer! + *

Needs to be synchronized on the serializer! */ private void writeAndClearBuffer( Buffer buffer, http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java index c7d25e5..c707d47 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.runtime.io.network.api.writer; import org.apache.flink.core.io.IOReadableWritable; @@ -42,7 +41,6 @@ public class RoundRobinChannelSelector implements this.nextChannelToSendTo[0] = 0; } - @Override public int[] selectChannels(final T record, final int numberOfOutputChannels) { http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java index 2f481b9..2ad50dc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java @@ -18,14 +18,17 @@ package org.apache.flink.runtime.io.network.api; -import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.junit.Test; import static org.junit.Assert.fail; +/** + * Tests for the {@link CheckpointBarrier} type. + */ public class CheckpointBarrierTest { /** http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java index c6c0645..d57675f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/reader/AbstractReaderTest.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.util.event.EventListener; + import org.junit.Test; import org.mockito.Matchers; http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java index 62d6aa5..de5f4a8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java @@ -18,13 +18,6 @@ package org.apache.flink.runtime.io.network.api.serialization; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.nio.ByteBuffer; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.CheckpointType; import org.apache.flink.runtime.event.AbstractEvent; @@ -34,10 +27,21 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.util.TestTaskEvent; - import org.apache.flink.runtime.state.CheckpointStorageLocationReference; + import org.junit.Test; +import java.io.IOException; +import java.nio.ByteBuffer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the {@link EventSerializer}. + */ public class EventSerializerTest { @Test @@ -78,12 +82,12 @@ public class EventSerializerTest { new TestTaskEvent(Math.random(), 12361231273L), new CancelCheckpointMarker(287087987329842L) }; - + for (AbstractEvent evt : events) { ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(evt); assertTrue(serializedEvent.hasRemaining()); - AbstractEvent deserialized = + AbstractEvent deserialized = EventSerializer.fromSerializedEvent(serializedEvent, getClass().getClassLoader()); assertNotNull(deserialized); assertEquals(evt, deserialized); @@ -94,8 +98,6 @@ public class EventSerializerTest { * Tests {@link EventSerializer#isEvent(Buffer, Class, ClassLoader)} * whether it peaks into the buffer only, i.e. after the call, the buffer * is still de-serializable. - * - * @throws Exception */ @Test public void testIsEventPeakOnly() throws Exception { @@ -117,8 +119,6 @@ public class EventSerializerTest { /** * Tests {@link EventSerializer#isEvent(Buffer, Class, ClassLoader)} returns * the correct answer for various encoded event buffers. - * - * @throws Exception */ @Test public void testIsEvent() throws Exception { @@ -151,7 +151,6 @@ public class EventSerializerTest { * * @return whether {@link EventSerializer#isEvent(ByteBuffer, Class, ClassLoader)} * thinks the encoded buffer matches the class - * @throws IOException */ private boolean checkIsEvent( AbstractEvent event, http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java index aad7ee1..3f782fb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java @@ -20,11 +20,11 @@ package org.apache.flink.runtime.io.network.api.serialization; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.memory.AbstractPagedInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; import org.apache.flink.testutils.serialization.types.SerializationTestType; import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory; import org.apache.flink.testutils.serialization.types.Util; -import org.apache.flink.runtime.memory.AbstractPagedInputView; -import org.apache.flink.runtime.memory.AbstractPagedOutputView; import org.junit.Test; @@ -39,14 +39,17 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +/** + * Tests for the {@link AbstractPagedInputView} and {@link AbstractPagedOutputView}. + */ public class PagedViewsTest { @Test public void testSequenceOfIntegersWithAlignedBuffers() { try { - final int NUM_INTS = 1000000; + final int numInts = 1000000; - testSequenceOfTypes(Util.randomRecords(NUM_INTS, SerializationTestTypeFactory.INT), 2048); + testSequenceOfTypes(Util.randomRecords(numInts, SerializationTestTypeFactory.INT), 2048); } catch (Exception e) { e.printStackTrace(); @@ -57,9 +60,9 @@ public class PagedViewsTest { @Test public void testSequenceOfIntegersWithUnalignedBuffers() { try { - final int NUM_INTS = 1000000; + final int numInts = 1000000; - testSequenceOfTypes(Util.randomRecords(NUM_INTS, SerializationTestTypeFactory.INT), 2047); + testSequenceOfTypes(Util.randomRecords(numInts, SerializationTestTypeFactory.INT), 2047); } catch (Exception e) { e.printStackTrace(); @@ -70,10 +73,10 @@ public class PagedViewsTest { @Test public void testRandomTypes() { try { - final int NUM_TYPES = 100000; + final int numTypes = 100000; // test with an odd buffer size to force many unaligned cases - testSequenceOfTypes(Util.randomRecords(NUM_TYPES), 57); + testSequenceOfTypes(Util.randomRecords(numTypes), 57); } catch (Exception e) { e.printStackTrace(); @@ -91,7 +94,7 @@ public class PagedViewsTest { try { outputView.write(expected); - }catch(Exception e){ + } catch (Exception e) { e.printStackTrace(); fail("Unexpected exception: Could not write to TestOutputView."); } @@ -123,7 +126,7 @@ public class PagedViewsTest { try { outputView.write(expected); - }catch(Exception e){ + } catch (Exception e) { e.printStackTrace(); fail("Unexpected exception: Could not write to TestOutputView."); } @@ -156,7 +159,7 @@ public class PagedViewsTest { try { outputView.write(expected); - }catch(Exception e){ + } catch (Exception e) { e.printStackTrace(); fail("Unexpected exception: Could not write to TestOutputView."); } @@ -178,7 +181,7 @@ public class PagedViewsTest { assertEquals(inputView.getCurrentPositionInSegment(), bytes2Write % segmentSize); byte[] tempBuffer = new byte[bytesRead]; - System.arraycopy(buffer,0,tempBuffer,0,bytesRead); + System.arraycopy(buffer, 0, tempBuffer, 0, bytesRead); assertArrayEquals(expected, tempBuffer); } @@ -194,7 +197,7 @@ public class PagedViewsTest { try { outputView.write(expected); - }catch(Exception e){ + } catch (Exception e){ e.printStackTrace(); fail("Unexpected exception: Could not write to TestOutputView."); } @@ -215,12 +218,12 @@ public class PagedViewsTest { assertEquals(bytes2Write, bytesRead); byte[] tempBuffer = new byte[bytesRead]; - System.arraycopy(buffer,0,tempBuffer,0,bytesRead); + System.arraycopy(buffer, 0, tempBuffer, 0, bytesRead); assertArrayEquals(expected, tempBuffer); - try{ + try { bytesRead = inputView.read(buffer); - }catch(IOException e){ + } catch (IOException e){ e.printStackTrace(); fail("Unexpected exception: Input view should be empty and thus return -1."); } @@ -241,7 +244,7 @@ public class PagedViewsTest { try { outputView.write(expected); - }catch(Exception e){ + } catch (Exception e) { e.printStackTrace(); fail("Unexpected exception: Could not write to TestOutputView."); } @@ -254,7 +257,7 @@ public class PagedViewsTest { try { inputView.readFully(buffer); - }catch(EOFException e){ + } catch (EOFException e) { //Expected exception eofException = true; } @@ -267,9 +270,9 @@ public class PagedViewsTest { int bytesRead = 0; - try{ - bytesRead =inputView.read(buffer); - }catch(Exception e){ + try { + bytesRead = inputView.read(buffer); + } catch (Exception e) { e.printStackTrace(); fail("Unexpected exception: Could not read TestInputView."); } @@ -288,7 +291,7 @@ public class PagedViewsTest { try { outputView.write(expected); - }catch(Exception e){ + } catch (Exception e) { e.printStackTrace(); fail("Unexpected exception: Could not write to TestOutputView."); } @@ -296,7 +299,7 @@ public class PagedViewsTest { outputView.close(); TestInputView inputView = new TestInputView(outputView.segments); - byte[] buffer = new byte[2*bufferSize]; + byte[] buffer = new byte[2 * bufferSize]; try { inputView.readFully(buffer, bufferSize, bufferSize); @@ -307,7 +310,7 @@ public class PagedViewsTest { assertEquals(inputView.getCurrentPositionInSegment(), bufferSize % segmentSize); byte[] tempBuffer = new byte[bufferSize]; - System.arraycopy(buffer, bufferSize, tempBuffer,0, bufferSize); + System.arraycopy(buffer, bufferSize, tempBuffer, 0, bufferSize); assertArrayEquals(expected, tempBuffer); } @@ -321,12 +324,12 @@ public class PagedViewsTest { byte[] buffer = new byte[segmentSize]; boolean eofException = false; - try{ + try { inputView.readFully(buffer); - }catch(EOFException e){ + } catch (EOFException e) { //expected Exception eofException = true; - }catch(Exception e){ + } catch (Exception e) { e.printStackTrace(); fail("Unexpected exception: Could not read TestInputView."); } @@ -334,10 +337,9 @@ public class PagedViewsTest { assertTrue("EOFException expected.", eofException); } - private static void testSequenceOfTypes(Iterable sequence, int segmentSize) throws Exception { - List elements = new ArrayList(512); + List elements = new ArrayList<>(512); TestOutputView outView = new TestOutputView(segmentSize); // write @@ -373,7 +375,7 @@ public class PagedViewsTest { private static final class TestOutputView extends AbstractPagedOutputView { - private final List segments = new ArrayList(); + private final List segments = new ArrayList<>(); private final int segmentSize; @@ -400,7 +402,6 @@ public class PagedViewsTest { private int num; - private TestInputView(List segments) { super(segments.get(0).segment, segments.get(0).position, 0); http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java index ed0ce6c..ebe588c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java @@ -30,16 +30,19 @@ import java.util.ArrayDeque; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder; +/** + * Tests for the {@link SpillingAdaptiveSpanningRecordDeserializer} and {@link AdaptiveSpanningRecordDeserializer}. + */ public class SpanningRecordSerializationTest { @Test public void testIntRecordsSpanningMultipleSegments() { - final int SEGMENT_SIZE = 1; - final int NUM_VALUES = 10; + final int segmentSize = 1; + final int numValues = 10; try { - testNonSpillingDeserializer(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE); - testSpillingDeserializer(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE); + testNonSpillingDeserializer(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize); + testSpillingDeserializer(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize); } catch (Exception e) { e.printStackTrace(); @@ -49,12 +52,12 @@ public class SpanningRecordSerializationTest { @Test public void testIntRecordsWithAlignedBuffers () { - final int SEGMENT_SIZE = 64; - final int NUM_VALUES = 64; + final int segmentSize = 64; + final int numValues = 64; try { - testNonSpillingDeserializer(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE); - testSpillingDeserializer(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE); + testNonSpillingDeserializer(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize); + testSpillingDeserializer(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize); } catch (Exception e) { e.printStackTrace(); @@ -64,12 +67,12 @@ public class SpanningRecordSerializationTest { @Test public void testIntRecordsWithUnalignedBuffers () { - final int SEGMENT_SIZE = 31; - final int NUM_VALUES = 248; + final int segmentSize = 31; + final int numValues = 248; try { - testNonSpillingDeserializer(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE); - testSpillingDeserializer(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE); + testNonSpillingDeserializer(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize); + testSpillingDeserializer(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize); } catch (Exception e) { e.printStackTrace(); @@ -78,13 +81,13 @@ public class SpanningRecordSerializationTest { } @Test - public void testRandomRecords () { - final int SEGMENT_SIZE = 127; - final int NUM_VALUES = 10000; + public void testRandomRecords () { + final int segmentSize = 127; + final int numValues = 10000; try { - testNonSpillingDeserializer(Util.randomRecords(NUM_VALUES), SEGMENT_SIZE); - testSpillingDeserializer(Util.randomRecords(NUM_VALUES), SEGMENT_SIZE); + testNonSpillingDeserializer(Util.randomRecords(numValues), segmentSize); + testSpillingDeserializer(Util.randomRecords(numValues), segmentSize); } catch (Exception e) { e.printStackTrace(); @@ -95,38 +98,37 @@ public class SpanningRecordSerializationTest { // ----------------------------------------------------------------------------------------------------------------- private void testNonSpillingDeserializer(Util.MockRecords records, int segmentSize) throws Exception { - RecordSerializer serializer = new SpanningRecordSerializer(); - RecordDeserializer deserializer = new AdaptiveSpanningRecordDeserializer(); - + RecordSerializer serializer = new SpanningRecordSerializer<>(); + RecordDeserializer deserializer = new AdaptiveSpanningRecordDeserializer<>(); + test(records, segmentSize, serializer, deserializer); } - + private void testSpillingDeserializer(Util.MockRecords records, int segmentSize) throws Exception { - RecordSerializer serializer = new SpanningRecordSerializer(); - RecordDeserializer deserializer = - new SpillingAdaptiveSpanningRecordDeserializer( + RecordSerializer serializer = new SpanningRecordSerializer<>(); + RecordDeserializer deserializer = + new SpillingAdaptiveSpanningRecordDeserializer<>( new String[] { System.getProperty("java.io.tmpdir") }); - + test(records, segmentSize, serializer, deserializer); } - + /** * Iterates over the provided records and tests whether {@link SpanningRecordSerializer} and {@link AdaptiveSpanningRecordDeserializer} * interact as expected. - *

- * Only a single {@link MemorySegment} will be allocated. + * + *

Only a single {@link MemorySegment} will be allocated. * * @param records records to test * @param segmentSize size for the {@link MemorySegment} */ - private void test(Util.MockRecords records, int segmentSize, + private void test(Util.MockRecords records, int segmentSize, RecordSerializer serializer, - RecordDeserializer deserializer) - throws Exception - { - final int SERIALIZATION_OVERHEAD = 4; // length encoding + RecordDeserializer deserializer) throws Exception { - final ArrayDeque serializedRecords = new ArrayDeque(); + final int serializationOverhead = 4; // length encoding + + final ArrayDeque serializedRecords = new ArrayDeque<>(); // ------------------------------------------------------------------------------------------------------------- @@ -139,7 +141,7 @@ public class SpanningRecordSerializationTest { serializedRecords.add(record); numRecords++; - numBytes += record.length() + SERIALIZATION_OVERHEAD; + numBytes += record.length() + serializationOverhead; // serialize record if (serializer.addRecord(record).isFullBuffer()) { @@ -162,9 +164,6 @@ public class SpanningRecordSerializationTest { while (serializer.setNextBufferBuilder(createBufferBuilder(segmentSize)).isFullBuffer()) { deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(), segmentSize); } - - - } } @@ -184,7 +183,6 @@ public class SpanningRecordSerializationTest { numRecords--; } - // assert that all records have been serialized and deserialized Assert.assertEquals(0, numRecords); Assert.assertFalse(serializer.hasData()); http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java index ed6677e..955fc39 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java @@ -33,13 +33,16 @@ import java.util.Random; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder; +/** + * Tests for the {@link SpanningRecordSerializer}. + */ public class SpanningRecordSerializerTest { @Test public void testHasData() { - final int SEGMENT_SIZE = 16; + final int segmentSize = 16; - final SpanningRecordSerializer serializer = new SpanningRecordSerializer(); + final SpanningRecordSerializer serializer = new SpanningRecordSerializer<>(); final SerializationTestType randomIntRecord = Util.randomRecord(SerializationTestTypeFactory.INT); Assert.assertFalse(serializer.hasData()); @@ -48,13 +51,13 @@ public class SpanningRecordSerializerTest { serializer.addRecord(randomIntRecord); Assert.assertTrue(serializer.hasData()); - serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE)); + serializer.setNextBufferBuilder(createBufferBuilder(segmentSize)); Assert.assertTrue(serializer.hasData()); serializer.clear(); Assert.assertFalse(serializer.hasData()); - serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE)); + serializer.setNextBufferBuilder(createBufferBuilder(segmentSize)); serializer.addRecord(randomIntRecord); Assert.assertTrue(serializer.hasData()); @@ -70,16 +73,17 @@ public class SpanningRecordSerializerTest { @Test public void testEmptyRecords() { - final int SEGMENT_SIZE = 11; + final int segmentSize = 11; - final SpanningRecordSerializer serializer = new SpanningRecordSerializer(); + final SpanningRecordSerializer serializer = new SpanningRecordSerializer<>(); try { Assert.assertEquals( RecordSerializer.SerializationResult.FULL_RECORD, - serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE))); + serializer.setNextBufferBuilder(createBufferBuilder(segmentSize))); } catch (IOException e) { e.printStackTrace(); + Assert.fail(e.getMessage()); } try { @@ -120,7 +124,7 @@ public class SpanningRecordSerializerTest { result = serializer.addRecord(emptyRecord); Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, result); - result = serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE)); + result = serializer.setNextBufferBuilder(createBufferBuilder(segmentSize)); Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result); } catch (Exception e) { @@ -131,11 +135,11 @@ public class SpanningRecordSerializerTest { @Test public void testIntRecordsSpanningMultipleSegments() { - final int SEGMENT_SIZE = 1; - final int NUM_VALUES = 10; + final int segmentSize = 1; + final int numValues = 10; try { - test(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE); + test(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize); } catch (Exception e) { e.printStackTrace(); @@ -145,11 +149,11 @@ public class SpanningRecordSerializerTest { @Test public void testIntRecordsWithAlignedSegments() { - final int SEGMENT_SIZE = 64; - final int NUM_VALUES = 64; + final int segmentSize = 64; + final int numValues = 64; try { - test(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE); + test(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize); } catch (Exception e) { e.printStackTrace(); @@ -159,11 +163,11 @@ public class SpanningRecordSerializerTest { @Test public void testIntRecordsWithUnalignedSegments() { - final int SEGMENT_SIZE = 31; - final int NUM_VALUES = 248; // least common multiple => last record should align + final int segmentSize = 31; + final int numValues = 248; // least common multiple => last record should align try { - test(Util.randomRecords(NUM_VALUES, SerializationTestTypeFactory.INT), SEGMENT_SIZE); + test(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize); } catch (Exception e) { e.printStackTrace(); @@ -173,11 +177,11 @@ public class SpanningRecordSerializerTest { @Test public void testRandomRecords() { - final int SEGMENT_SIZE = 127; - final int NUM_VALUES = 100000; + final int segmentSize = 127; + final int numValues = 100000; try { - test(Util.randomRecords(NUM_VALUES), SEGMENT_SIZE); + test(Util.randomRecords(numValues), segmentSize); } catch (Exception e) { e.printStackTrace(); @@ -190,16 +194,16 @@ public class SpanningRecordSerializerTest { /** * Iterates over the provided records and tests whether the {@link SpanningRecordSerializer} returns the expected * {@link RecordSerializer.SerializationResult} values. - *

- * Only a single {@link MemorySegment} will be allocated. + * + *

Only a single {@link MemorySegment} will be allocated. * * @param records records to test * @param segmentSize size for the {@link MemorySegment} */ private void test(Util.MockRecords records, int segmentSize) throws Exception { - final int SERIALIZATION_OVERHEAD = 4; // length encoding + final int serializationOverhead = 4; // length encoding - final SpanningRecordSerializer serializer = new SpanningRecordSerializer(); + final SpanningRecordSerializer serializer = new SpanningRecordSerializer<>(); // ------------------------------------------------------------------------------------------------------------- @@ -208,7 +212,7 @@ public class SpanningRecordSerializerTest { int numBytes = 0; for (SerializationTestType record : records) { RecordSerializer.SerializationResult result = serializer.addRecord(record); - numBytes += record.length() + SERIALIZATION_OVERHEAD; + numBytes += record.length() + serializationOverhead; if (numBytes < segmentSize) { Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result); http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java index e04809c..4fac5b5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java @@ -35,8 +35,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.buffer.BufferRecycler; -import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; -import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; @@ -80,6 +78,9 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +/** + * Tests for the {@link RecordWriter}. + */ @PrepareForTest({EventSerializer.class}) @RunWith(PowerMockRunner.class) public class RecordWriterTest { @@ -311,7 +312,7 @@ public class RecordWriterTest { ResultPartitionWriter partitionWriter = spy(new RecyclingPartitionWriter(new TestPooledBufferProvider(1, 16))); - RecordWriter recordWriter = new RecordWriter(partitionWriter); + RecordWriter recordWriter = new RecordWriter<>(partitionWriter); // Fill a buffer, but don't write it out. recordWriter.emit(new IntValue(0)); @@ -432,8 +433,6 @@ public class RecordWriterTest { /** * Tests that event buffers are properly recycled when broadcasting events * to multiple channels. - * - * @throws Exception */ @Test public void testBroadcastEventBufferReferenceCounting() throws Exception { @@ -493,6 +492,7 @@ public class RecordWriterTest { buffer1.setReaderIndex(1); assertEquals("Buffer 2 shares the same reader index as buffer 1", 0, buffer2.getReaderIndex()); } + /** * Tests that broadcasted records' buffers are independent (in their (reader) indices) once they * are put into the queue for Netty when broadcasting events to multiple channels. @@ -527,36 +527,6 @@ public class RecordWriterTest { // Helpers // --------------------------------------------------------------------------------------------- - private BufferProvider createBufferProvider(final int bufferSize) - throws IOException, InterruptedException { - - BufferProvider bufferProvider = mock(BufferProvider.class); - when(bufferProvider.requestBufferBlocking()).thenAnswer( - new Answer() { - @Override - public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable { - MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(bufferSize); - Buffer buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE); - return buffer; - } - } - ); - - return bufferProvider; - } - - private BufferProvider createBufferProvider(Buffer... buffers) - throws IOException, InterruptedException { - - BufferProvider bufferProvider = mock(BufferProvider.class); - - for (int i = 0; i < buffers.length; i++) { - when(bufferProvider.requestBufferBlocking()).thenReturn(buffers[i]); - } - - return bufferProvider; - } - /** * Partition writer that collects the added buffers/events in multiple queue. */ http://git-wip-us.apache.org/repos/asf/flink/blob/b4675f2a/tools/maven/suppressions-runtime.xml ---------------------------------------------------------------------- diff --git a/tools/maven/suppressions-runtime.xml b/tools/maven/suppressions-runtime.xml index 9d9cb87..33a92e3 100644 --- a/tools/maven/suppressions-runtime.xml +++ b/tools/maven/suppressions-runtime.xml @@ -87,11 +87,11 @@ under the License. files="(.*)test[/\\](.*)runtime[/\\]io[/\\](async|disk)[/\\](.*)" checks="AvoidStarImport|UnusedImports"/>