flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/2] flink git commit: [FLINK-2011] [runtime] Improve error message when user-defined serialization logic is erroneous
Date Wed, 13 May 2015 18:01:40 GMT
Repository: flink
Updated Branches:
  refs/heads/master 48e21a1ae -> 113b20b7f


[FLINK-2011] [runtime] Improve error message when user-defined serialization logic is erroneous


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/113b20b7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/113b20b7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/113b20b7

Branch: refs/heads/master
Commit: 113b20b7f8717b12c5f0dfa691da582d426fbae0
Parents: 9da2f1f
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed May 13 15:27:00 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed May 13 18:26:26 2015 +0200

----------------------------------------------------------------------
 .../api/reader/AbstractRecordReader.java        | 29 ++++++++++++++------
 ...llingAdaptiveSpanningRecordDeserializer.java |  1 -
 2 files changed, 20 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/113b20b7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
index 4ee7fad..bf43c72 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
@@ -79,16 +79,27 @@ abstract class AbstractRecordReader<T extends IOReadableWritable>
extends Abstra
 				currentRecordDeserializer = recordDeserializers[bufferOrEvent.getChannelIndex()];
 				currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
 			}
-			else if (handleEvent(bufferOrEvent.getEvent())) {
-				if (inputGate.isFinished()) {
-					isFinished = true;
-
-					return false;
+			else {
+				// sanity check for leftover data in deserializers. events should only come between
+				// records, not in the middle of a fragment
+				if (recordDeserializers[bufferOrEvent.getChannelIndex()].hasUnfinishedData()) {
+					throw new IllegalStateException(
+							"Received an event in channel " + bufferOrEvent.getChannelIndex() + " while still
having "
+							+ "data from a record. This indicates broken serialization logic. "
+							+ "If you are using custom serialization code (Writable or Value types), check their
"
+							+ "serialization routines. In the case of Kryo, check the respective Kryo serializer.");
+				}
+				
+				if (handleEvent(bufferOrEvent.getEvent())) {
+					if (inputGate.isFinished()) {
+						isFinished = true;
+						return false;
+					}
+					else if (hasReachedEndOfSuperstep()) {
+						return false;
+					}
+					// else: More data is coming...
 				}
-				else if (hasReachedEndOfSuperstep()) {
-
-					return false;
-				} // else: More data is coming...
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/113b20b7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index eae3e65..453d448 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -24,7 +24,6 @@ import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.DataInputDeserializer;
 import org.apache.flink.util.StringUtils;


Mime
View raw message