flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/2] flink git commit: [FLINK-1531] [runtime] Adds proper EOFException forwarding to KryoSerializer.
Date Thu, 12 Feb 2015 17:40:05 GMT
Repository: flink
Updated Branches:
  refs/heads/master df0d62a71 -> bc1432a2f


[FLINK-1531] [runtime] Adds proper EOFException forwarding to KryoSerializer.

This closes #391.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/21f47d9c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/21f47d9c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/21f47d9c

Branch: refs/heads/master
Commit: 21f47d9c69441c17b5f90ea2c7cb8f4d47f7fcb5
Parents: df0d62a
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu Feb 12 16:50:46 2015 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Feb 12 18:24:13 2015 +0100

----------------------------------------------------------------------
 .../java/typeutils/runtime/KryoSerializer.java  | 13 +++++-
 .../java/typeutils/runtime/NoFetchingInput.java |  5 ++-
 .../runtime/KryoGenericTypeSerializerTest.java  | 43 +++++++++++++++++++-
 .../runtime/TestDataOutputSerializer.java       |  7 ++++
 .../java/typeutils/runtime/TestInputView.java   | 40 ++++++++++++++++++
 5 files changed, 103 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/21f47d9c/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
index 133dd57..d8411a0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
@@ -195,7 +195,18 @@ public class KryoSerializer<T> extends TypeSerializer<T>
{
 			input = new NoFetchingInput(inputStream);
 			previousIn = source;
 		}
-		return (T) kryo.readClassAndObject(input);
+
+		try {
+			return (T) kryo.readClassAndObject(input);
+		} catch (KryoException ke) {
+			Throwable cause = ke.getCause();
+
+			if(cause instanceof EOFException) {
+				throw (EOFException) cause;
+			} else {
+				throw ke;
+			}
+		}
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/21f47d9c/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java
index 524347c..0f4fe94 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.java.typeutils.runtime;
 import com.esotericsoftware.kryo.KryoException;
 import com.esotericsoftware.kryo.io.Input;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 
@@ -73,7 +74,7 @@ public class NoFetchingInput extends Input {
 			count = fill(buffer, bytesRead, required - bytesRead);
 
 			if(count == -1){
-				throw new KryoException("Buffer underflow");
+				throw new KryoException(new EOFException("No more bytes left."));
 			}
 
 			bytesRead += count;
@@ -121,7 +122,7 @@ public class NoFetchingInput extends Input {
 				c = inputStream.read(bytes, offset+bytesRead, count-bytesRead);
 
 				if(c == -1){
-					throw new KryoException("Buffer underflow");
+					throw new KryoException(new EOFException("No more bytes left."));
 				}
 
 				bytesRead += c;

http://git-wip-us.apache.org/repos/asf/flink/blob/21f47d9c/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
index 8630d95..afa8c52 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/KryoGenericTypeSerializerTest.java
@@ -75,10 +75,10 @@ public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializer
 	}
 	
 	/**
-	 * Make sure that the kryo serializer forwards EOF exceptions properly
+	 * Make sure that the kryo serializer forwards EOF exceptions properly when serializing
 	 */
 	@Test
-	public void testForwardEOFException() {
+	public void testForwardEOFExceptionWhileSerializing() {
 		try {
 			// construct a long string
 			String str;
@@ -113,4 +113,43 @@ public class KryoGenericTypeSerializerTest extends AbstractGenericTypeSerializer
 			fail(e.getMessage());
 		}
 	}
+
+	/**
+	 * Make sure that the kryo serializer forwards EOF exceptions properly when serializing
+	 */
+	@Test
+	public void testForwardEOFExceptionWhileDeserializing() {
+		try {
+			int numElements = 100;
+			// construct a memory target that is too small for the string
+			TestDataOutputSerializer target = new TestDataOutputSerializer(5*numElements, 5*numElements);
+			KryoSerializer<Integer> serializer = new KryoSerializer<Integer>(Integer.class,
new ExecutionConfig());
+
+			for(int i = 0; i < numElements; i++){
+				serializer.serialize(i, target);
+			}
+
+			TestInputView source = new TestInputView(target.copyByteBuffer());
+
+			for(int i = 0; i < numElements; i++){
+				int value = serializer.deserialize(source);
+				assertEquals(i, value);
+			}
+
+			try {
+				serializer.deserialize(source);
+				fail("should throw a java.io.EOFException");
+			}
+			catch (java.io.EOFException e) {
+				// that is how we like it :-)
+			}
+			catch (Exception e) {
+				fail("throws wrong exception: should throw a java.io.EOFException, has thrown a " + e.getClass().getName());
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/21f47d9c/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java
index 58a2aeb..87be6db 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java
@@ -59,6 +59,13 @@ public final class TestDataOutputSerializer implements DataOutputView {
 		return this.wrapper;
 	}
 
+	public byte[] copyByteBuffer() {
+		byte[] target = new byte[position];
+		System.arraycopy(buffer, 0, target, 0, position);
+
+		return target;
+	}
+
 	public void clear() {
 		this.position = 0;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/21f47d9c/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestInputView.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestInputView.java
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestInputView.java
new file mode 100644
index 0000000..e5ef665
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestInputView.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.core.memory.DataInputView;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+class TestInputView extends DataInputStream implements DataInputView {
+
+	public TestInputView(byte[] data) {
+		super(new ByteArrayInputStream(data));
+	}
+
+	@Override
+	public void skipBytesToRead(int numBytes) throws IOException {
+		while (numBytes > 0) {
+			int skipped = skipBytes(numBytes);
+			numBytes -= skipped;
+		}
+	}
+}


Mime
View raw message