flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/2] incubator-flink git commit: [APIs] Enhance test coverage for CollectionInputFormat and add tests for failed serializations of user code objects
Date Fri, 21 Nov 2014 15:58:03 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master 02e2857a9 -> cf54a1c2a


[APIs] Enhance test coverage for CollectionInputFormat and add tests for failed serializations
of user code objects


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d7853fd3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d7853fd3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d7853fd3

Branch: refs/heads/master
Commit: d7853fd3157310cae2e66dfc0e9ff146905bdb33
Parents: 02e2857
Author: Stephan Ewen <sewen@apache.org>
Authored: Thu Nov 20 15:25:21 2014 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Nov 21 16:56:26 2014 +0100

----------------------------------------------------------------------
 .../flink/util/InstantiationUtilTest.java       |  86 ++++++++
 .../api/java/io/CollectionInputFormat.java      |   7 +-
 .../api/java/io/CollectionInputFormatTest.java  | 202 ++++++++++++++++---
 .../flink/api/java/io/CsvInputFormatTest.java   |   7 +-
 4 files changed, 267 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d7853fd3/flink-core/src/test/java/org/apache/flink/util/InstantiationUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/InstantiationUtilTest.java b/flink-core/src/test/java/org/apache/flink/util/InstantiationUtilTest.java
index bf4fc8c..aded919 100644
--- a/flink-core/src/test/java/org/apache/flink/util/InstantiationUtilTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/InstantiationUtilTest.java
@@ -19,17 +19,21 @@
 package org.apache.flink.util;
 
 import org.apache.flink.api.common.typeutils.base.DoubleValueSerializer;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.types.DoubleValue;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.types.Value;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class InstantiationUtilTest {
 
@@ -79,6 +83,88 @@ public class InstantiationUtilTest {
 
 		assertEquals("Serialized record is not equal after serialization.", toSerialize, deserialized);
 	}
+	
+	@Test
+	public void testWriteToConfigFailingSerialization() {
+		try {
+			final String key1 = "testkey1";
+			final String key2 = "testkey2";
+			final Configuration config = new Configuration();
+			
+			try {
+				InstantiationUtil.writeObjectToConfig(new TestClassWriteFails(), config, "irgnored");
+				fail("should throw an exception");
+			}
+			catch (TestException e) {
+				// expected
+			}
+			catch (Exception e) {
+				fail("Wrong exception type - exception not properly forwarded");
+			}
+			
+			InstantiationUtil.writeObjectToConfig(new TestClassReadFails(), config, key1);
+			InstantiationUtil.writeObjectToConfig(new TestClassReadFailsCNF(), config, key2);
+			
+			try {
+				InstantiationUtil.readObjectFromConfig(config, key1, getClass().getClassLoader());
+				fail("should throw an exception");
+			}
+			catch (TestException e) {
+				// expected
+			}
+			catch (Exception e) {
+				fail("Wrong exception type - exception not properly forwarded");
+			}
+			
+			try {
+				InstantiationUtil.readObjectFromConfig(config, key2, getClass().getClassLoader());
+				fail("should throw an exception");
+			}
+			catch (ClassNotFoundException e) {
+				// expected
+			}
+			catch (Exception e) {
+				fail("Wrong exception type - exception not properly forwarded");
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
 
+	// --------------------------------------------------------------------------------------------
+	
 	private class TestClass {}
+	
+	private static class TestException extends IOException{
+		private static final long serialVersionUID = 1L;
+	}
+	
+	private static class TestClassWriteFails implements java.io.Serializable {
+		
+		private static final long serialVersionUID = 1L;
+
+		private void writeObject(ObjectOutputStream out) throws IOException {
+			throw new TestException();
+		}
+	}
+	
+	private static class TestClassReadFails implements java.io.Serializable {
+		
+		private static final long serialVersionUID = 1L;
+
+		private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException
{
+			throw new TestException();
+		}
+	}
+	
+	private static class TestClassReadFailsCNF implements java.io.Serializable {
+		
+		private static final long serialVersionUID = 1L;
+
+		private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException
{
+			throw new ClassNotFoundException("test exception");
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d7853fd3/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
b/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
index 89adf96..b999ede 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
@@ -80,8 +80,9 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T>
implements N
 		out.defaultWriteObject();
 		out.writeInt(dataSet.size());
 		
+		OutputViewObjectOutputStreamWrapper wrapper = new OutputViewObjectOutputStreamWrapper(out);
 		for (T element : dataSet){
-			serializer.serialize(element, new OutputViewObjectOutputStreamWrapper(out));
+			serializer.serialize(element, wrapper);
 		}
 	}
 
@@ -91,10 +92,10 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T>
implements N
 		int collectionLength = in.readInt();
 		List<T> list = new ArrayList<T>(collectionLength);
 		
-
+		InputViewObjectInputStreamWrapper wrapper = new InputViewObjectInputStreamWrapper(in);
 		for (int i = 0; i < collectionLength; i++){
 			T element = serializer.createInstance();
-			element = serializer.deserialize(element, new InputViewObjectInputStreamWrapper(in));
+			element = serializer.deserialize(element, wrapper);
 			list.add(element);
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d7853fd3/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
index 11a018c..948d22f 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.io;
 
 import static org.junit.Assert.assertEquals;
@@ -26,8 +25,11 @@ import static org.junit.Assert.fail;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
@@ -38,11 +40,13 @@ import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 
 public class CollectionInputFormatTest {
-	public static class ElementType{
-		private int id;
+	
+	public static class ElementType {
+		private final int id;
 
 		public ElementType(){
 			this(-1);
@@ -52,37 +56,43 @@ public class CollectionInputFormatTest {
 			this.id = id;
 		}
 
-		public int getId(){return id;}
+		public int getId() {
+			return id;
+		}
 
 		@Override
-		public boolean equals(Object obj){
-			if(obj != null && obj instanceof ElementType){
+		public boolean equals(Object obj) {
+			if (obj != null && obj instanceof ElementType) {
 				ElementType et = (ElementType) obj;
-
 				return et.getId() == this.getId();
-			}else {
+			} else {
 				return false;
 			}
 		}
+		
+		@Override
+		public int hashCode() {
+			return id;
+		}
 	}
 
 	@Test
-	public void testSerializability(){
-		Collection<ElementType> inputCollection = new ArrayList<ElementType>();
-		ElementType element1 = new ElementType(1);
-		ElementType element2 = new ElementType(2);
-		ElementType element3 = new ElementType(3);
-		inputCollection.add(element1);
-		inputCollection.add(element2);
-		inputCollection.add(element3);
-
-		@SuppressWarnings("unchecked")
-		TypeInformation<ElementType> info = (TypeInformation<ElementType>) TypeExtractor.createTypeInfo(ElementType.class);
-
-		CollectionInputFormat<ElementType> inputFormat = new CollectionInputFormat<ElementType>(inputCollection,
-				info.createSerializer());
-
-		try{
+	public void testSerializability() {
+		try {
+			Collection<ElementType> inputCollection = new ArrayList<ElementType>();
+			ElementType element1 = new ElementType(1);
+			ElementType element2 = new ElementType(2);
+			ElementType element3 = new ElementType(3);
+			inputCollection.add(element1);
+			inputCollection.add(element2);
+			inputCollection.add(element3);
+	
+			@SuppressWarnings("unchecked")
+			TypeInformation<ElementType> info = (TypeInformation<ElementType>) TypeExtractor.createTypeInfo(ElementType.class);
+	
+			CollectionInputFormat<ElementType> inputFormat = new CollectionInputFormat<ElementType>(inputCollection,
+					info.createSerializer());
+
 			ByteArrayOutputStream buffer = new ByteArrayOutputStream();
 			ObjectOutputStream out = new ObjectOutputStream(buffer);
 
@@ -108,10 +118,10 @@ public class CollectionInputFormatTest {
 
 				assertEquals(expectedElement, actualElement);
 			}
-		}catch(IOException ex){
-			fail(ex.toString());
-		}catch(ClassNotFoundException ex){
-			fail(ex.toString());
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			fail(e.toString());
 		}
 	}
 	
@@ -157,7 +167,6 @@ public class CollectionInputFormatTest {
 		};
 		
 		try {
-			
 			List<String> inputCollection = Arrays.asList(data);
 			CollectionInputFormat<String> inputFormat = new CollectionInputFormat<String>(inputCollection,
BasicTypeInfo.STRING_TYPE_INFO.createSerializer());
 			
@@ -190,4 +199,139 @@ public class CollectionInputFormatTest {
 			fail(e.getMessage());
 		}
 	}
+	
+	@Test
+	public void testSerializationFailure() {
+		try {
+			// a mock serializer that fails when writing
+			CollectionInputFormat<ElementType> inFormat = new CollectionInputFormat<ElementType>(
+					Collections.singleton(new ElementType()), new TestSerializer(false, true));
+			
+			ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+			ObjectOutputStream out = new ObjectOutputStream(buffer);
+			
+			try {
+				out.writeObject(inFormat);
+				fail("should throw an exception");
+			}
+			catch (TestException e) {
+				// expected
+			}
+			catch (Exception e) {
+				fail("Exception not properly forwarded");
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testDeserializationFailure() {
+		try {
+			// a mock serializer that fails when writing
+			CollectionInputFormat<ElementType> inFormat = new CollectionInputFormat<ElementType>(
+					Collections.singleton(new ElementType()), new TestSerializer(true, false));
+			
+			ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+			ObjectOutputStream out = new ObjectOutputStream(buffer);
+			out.writeObject(inFormat);
+			out.close();
+			
+			ByteArrayInputStream bais = new ByteArrayInputStream(buffer.toByteArray());
+			ObjectInputStream in = new ObjectInputStream(bais);
+			
+			try {
+				in.readObject();
+				fail("should throw an exception");
+			}
+			catch (TestException e) {
+				// expected
+			}
+			catch (Exception e) {
+				fail("Exception not properly forwarded");
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	private static class TestException extends IOException{
+		private static final long serialVersionUID = 1L;
+	}
+	
+	private static class TestSerializer extends TypeSerializer<ElementType> {
+
+		private static final long serialVersionUID = 1L;
+		
+		private boolean failOnRead;
+		private boolean failOnWrite;
+		
+		public TestSerializer(boolean failOnRead, boolean failOnWrite) {
+			this.failOnRead = failOnRead;
+			this.failOnWrite = failOnWrite;
+		}
+
+		@Override
+		public boolean isImmutableType() {
+			return true;
+		}
+
+		@Override
+		public boolean isStateful() {
+			return false;
+		}
+
+		@Override
+		public ElementType createInstance() {
+			return new ElementType();
+		}
+
+		@Override
+		public ElementType copy(ElementType from) {
+			return from;
+		}
+
+		@Override
+		public ElementType copy(ElementType from, ElementType reuse) {
+			return from;
+		}
+
+		@Override
+		public int getLength() {
+			return 4;
+		}
+
+		@Override
+		public void serialize(ElementType record, DataOutputView target) throws IOException {
+			if (failOnWrite) {
+				throw new TestException();
+			}
+			target.writeInt(record.getId());
+		}
+
+		@Override
+		public ElementType deserialize(DataInputView source) throws IOException {
+			if (failOnRead) {
+				throw new TestException();
+			}
+			return new ElementType(source.readInt());
+		}
+
+		@Override
+		public ElementType deserialize(ElementType reuse, DataInputView source) throws IOException
{
+			if (failOnRead) {
+				throw new TestException();
+			}
+			return new ElementType(source.readInt());
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			target.writeInt(source.readInt());
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d7853fd3/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index 0662aa6..5f10a2b 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -20,6 +20,7 @@
 package org.apache.flink.api.java.io;
 
 import com.google.common.base.Charsets;
+
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -399,7 +400,8 @@ public class CsvInputFormatTest {
 
 		Tuple5<Integer, String, String, String, Double> result = new Tuple5<Integer, String,
String, String, Double>();
 
-		Tuple5[] expectedLines = new Tuple5[]{
+		@SuppressWarnings("unchecked")
+		Tuple5<Integer, String, String, String, Double>[] expectedLines = new Tuple5[] {
 				new Tuple5<Integer, String, String, String, Double>(1997, "Ford", "E350", "ac,
abs, moon", 3000.0),
 				new Tuple5<Integer, String, String, String, Double>(1999, "Chevy", "Venture \"Extended
Edition\"", "", 4900.0),
 				new Tuple5<Integer, String, String, String, Double>(1996, "Jeep", "Grand Cherokee",
"MUST SELL! air, moon roof, loaded", 4799.00),
@@ -408,8 +410,7 @@ public class CsvInputFormatTest {
 		};
 
 		try {
-
-			for (Tuple5 expected : expectedLines) {
+			for (Tuple5<Integer, String, String, String, Double> expected : expectedLines) {
 				result = format.nextRecord(result);
 				assertEquals(expected, result);
 			}


Mime
View raw message