flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/2] [FLINK-1005] Extend TypeSerializer interface to handle non-mutable object deserialization more efficiently.
Date Tue, 30 Sep 2014 14:57:14 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master ea4c8828c -> 76d4a75e8


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializerTest.java
new file mode 100644
index 0000000..13aba1f
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializerTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.FloatValue;
+
+/**
+ * A test for the {@link FloatValueSerializer}.
+ */
+public class FloatValueSerializerTest extends SerializerTestBase<FloatValue> {
+	
+	@Override
+	protected TypeSerializer<FloatValue> createSerializer() {
+		return new FloatValueSerializer();
+	}
+	
+	@Override
+	protected int getLength() {
+		return 4;
+	}
+	
+	@Override
+	protected Class<FloatValue> getTypeClass() {
+		return FloatValue.class;
+	}
+	
+	@Override
+	protected FloatValue[] getTestData() {
+		Random rnd = new Random(874597969123412341L);
+		float rndFloat = rnd.nextFloat() * Float.MAX_VALUE;
+		
+		return new FloatValue[] {new FloatValue(0), new FloatValue(1), new FloatValue(-1),
+							new FloatValue(Float.MAX_VALUE), new FloatValue(Float.MIN_VALUE),
+							new FloatValue(rndFloat), new FloatValue(-rndFloat),
+							new FloatValue(Float.NaN),
+							new FloatValue(Float.NEGATIVE_INFINITY), new FloatValue(Float.POSITIVE_INFINITY)};
+	}
+}	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntValueSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntValueSerializerTest.java
new file mode 100644
index 0000000..80d63b0
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntValueSerializerTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.IntValue;
+
+/**
+ * A test for the {@link IntValueSerializer}.
+ */
+public class IntValueSerializerTest extends SerializerTestBase<IntValue> {
+	
+	@Override
+	protected TypeSerializer<IntValue> createSerializer() {
+		return new IntValueSerializer();
+	}
+	
+	@Override
+	protected int getLength() {
+		return 4;
+	}
+	
+	@Override
+	protected Class<IntValue> getTypeClass() {
+		return IntValue.class;
+	}
+	
+	@Override
+	protected IntValue[] getTestData() {
+		Random rnd = new Random(874597969123412341L);
+		int rndInt = rnd.nextInt();
+		
+		return new IntValue[] {new IntValue(0), new IntValue(1), new IntValue(-1),
+							new IntValue(Integer.MAX_VALUE), new IntValue(Integer.MIN_VALUE),
+							new IntValue(rndInt), new IntValue(-rndInt)};
+	}
+}	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongValueSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongValueSerializerTest.java
new file mode 100644
index 0000000..533b809
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongValueSerializerTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.LongValue;
+
+/**
+ * A test for the {@link LongValueSerializer}.
+ */
+public class LongValueSerializerTest extends SerializerTestBase<LongValue> {
+	
+	@Override
+	protected TypeSerializer<LongValue> createSerializer() {
+		return new LongValueSerializer();
+	}
+	
+	@Override
+	protected int getLength() {
+		return 8;
+	}
+	
+	@Override
+	protected Class<LongValue> getTypeClass() {
+		return LongValue.class;
+	}
+	
+	@Override
+	protected LongValue[] getTestData() {
+		Random rnd = new Random(874597969123412341L);
+		long rndLong = rnd.nextLong();
+		
+		return new LongValue[] {new LongValue(0L), new LongValue(1L), new LongValue(-1L),
+							new LongValue(Long.MAX_VALUE), new LongValue(Long.MIN_VALUE),
+							new LongValue(rndLong), new LongValue(-rndLong)};
+	}
+}	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializerTest.java
new file mode 100644
index 0000000..4ef6816
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializerTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.ShortValue;
+
+/**
+ * A test for the {@link ShortValueSerializer}.
+ */
+public class ShortValueSerializerTest extends SerializerTestBase<ShortValue> {
+	
+	@Override
+	protected TypeSerializer<ShortValue> createSerializer() {
+		return new ShortValueSerializer();
+	}
+	
+	@Override
+	protected int getLength() {
+		return 2;
+	}
+	
+	@Override
+	protected Class<ShortValue> getTypeClass() {
+		return ShortValue.class;
+	}
+	
+	@Override
+	protected ShortValue[] getTestData() {
+		Random rnd = new Random(874597969123412341L);
+		int rndInt = rnd.nextInt(32767);
+		
+		return new ShortValue[] {new ShortValue((short) 0), new ShortValue((short) 1), new ShortValue((short) -1),
+							new ShortValue((short) rndInt), new ShortValue((short) -rndInt),
+							new ShortValue((short) -32767), new ShortValue((short) 32768)};
+	}
+}
+	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/StringValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/StringValueSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/StringValueSerializerTest.java
new file mode 100644
index 0000000..c20cd92
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/StringValueSerializerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.StringValue;
+
+/**
+ * A test for the {@link StringValueSerializer}.
+ */
+public class StringValueSerializerTest extends SerializerTestBase<StringValue> {
+	
+	@Override
+	protected TypeSerializer<StringValue> createSerializer() {
+		return new StringValueSerializer();
+	}
+	
+	@Override
+	protected int getLength() {
+		return -1;
+	}
+	
+	@Override
+	protected Class<StringValue> getTypeClass() {
+		return StringValue.class;
+	}
+	
+	@Override
+	protected StringValue[] getTestData() {
+		return new StringValue[] {
+				new StringValue("a"),
+				new StringValue(""),
+				new StringValue("bcd"),
+				new StringValue("jbmbmner8 jhk hj \n \t üäßß@µ"),
+				new StringValue(""),
+				new StringValue("non-empty")};
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-core/src/test/resources/logback-test.xml b/flink-core/src/test/resources/logback-test.xml
index 8b3bb27..4f484cb 100644
--- a/flink-core/src/test/resources/logback-test.xml
+++ b/flink-core/src/test/resources/logback-test.xml
@@ -26,4 +26,8 @@
     <root level="WARN">
         <appender-ref ref="STDOUT"/>
     </root>
+    
+    <logger name="org.apache.flink.api.common.io.DelimitedInputFormat" level="OFF"/>
+    <logger name="org.apache.flink.api.common.io.FileInputFormat" level="OFF"/>
+    <logger name="org.apache.flink.configuration.GlobalConfiguration" level="OFF"/>
 </configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
index d93030d..31a04c9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
@@ -36,7 +36,7 @@ import com.esotericsoftware.kryo.Kryo;
  *
  * @param <T> The type serialized.
  */
-public class AvroSerializer<T> extends TypeSerializer<T> {
+public final class AvroSerializer<T> extends TypeSerializer<T> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -89,10 +89,15 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
 	}
 
 	@Override
+	public T copy(T from) {
+		checkKryoInitialized();
+		return this.kryo.copy(from);
+	}
+	
+	@Override
 	public T copy(T from, T reuse) {
 		checkKryoInitialized();
-		reuse = this.kryo.copy(from);
-		return reuse;
+		return this.kryo.copy(from);
 	}
 
 	@Override
@@ -106,6 +111,13 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
 		this.encoder.setOut(target);
 		this.writer.write(value, this.encoder);
 	}
+	
+	@Override
+	public T deserialize(DataInputView source) throws IOException {
+		checkAvroInitialized();
+		this.decoder.setIn(source);
+		return this.reader.read(null, this.decoder);
+	}
 
 	@Override
 	public T deserialize(T reuse, DataInputView source) throws IOException {
@@ -146,4 +158,21 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
 			this.kryo.register(type);
 		}
 	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public int hashCode() {
+		return 0x42fba55c + this.type.hashCode() + this.typeToInstantiate.hashCode();
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj.getClass() == AvroSerializer.class) {
+			AvroSerializer<?> other = (AvroSerializer<?>) obj;
+			return this.type == other.type && this.typeToInstantiate == other.typeToInstantiate;
+		} else {
+			return false;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
index fa3a91e..9d12d7e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
@@ -55,7 +55,12 @@ public class CopyableValueSerializer<T extends CopyableValue<T>> extends TypeSer
 	public T createInstance() {
 		return InstantiationUtil.instantiate(this.valueClass);
 	}
-
+	
+	@Override
+	public T copy(T from) {
+		return copy(from, createInstance());
+	}
+	
 	@Override
 	public T copy(T from, T reuse) {
 		from.copyTo(reuse);
@@ -74,6 +79,11 @@ public class CopyableValueSerializer<T extends CopyableValue<T>> extends TypeSer
 	}
 
 	@Override
+	public T deserialize(DataInputView source) throws IOException {
+		return deserialize(createInstance(), source);
+	}
+	
+	@Override
 	public T deserialize(T reuse, DataInputView source) throws IOException {
 		reuse.read(source);
 		return reuse;
@@ -92,4 +102,19 @@ public class CopyableValueSerializer<T extends CopyableValue<T>> extends TypeSer
 			instance = createInstance();
 		}
 	}
+	
+	@Override
+	public int hashCode() {
+		return this.valueClass.hashCode() + 9231;
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj.getClass() == CopyableValueSerializer.class) {
+			CopyableValueSerializer<?> other = (CopyableValueSerializer<?>) obj;
+			return this.valueClass == other.valueClass;
+		} else {
+			return false;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
index 9e2a1f3..a3acb20 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java
@@ -21,6 +21,7 @@ package org.apache.flink.api.java.typeutils.runtime;
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
+
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -75,10 +76,15 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 	}
 
 	@Override
+	public T copy(T from) {
+		checkKryoInitialized();
+		return kryo.copy(from);
+	}
+	
+	@Override
 	public T copy(T from, T reuse) {
 		checkKryoInitialized();
-		reuse = kryo.copy(from);
-		return reuse;
+		return kryo.copy(from);
 	}
 
 	@Override
@@ -100,15 +106,19 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 	}
 
 	@Override
-	public T deserialize(T reuse, DataInputView source) throws IOException {
+	public T deserialize(DataInputView source) throws IOException {
 		checkKryoInitialized();
 		if (source != previousIn) {
 			DataInputViewStream inputStream = new DataInputViewStream(source);
 			input = new NoFetchingInput(inputStream);
 			previousIn = source;
 		}
-		reuse = kryo.readObject(input, typeToInstantiate);
-		return reuse;
+		return kryo.readObject(input, typeToInstantiate);
+	}
+	
+	@Override
+	public T deserialize(T reuse, DataInputView source) throws IOException {
+		return deserialize(source);
 	}
 
 	@Override
@@ -121,6 +131,25 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 		T tmp = deserialize(copyInstance, source);
 		serialize(tmp, target);
 	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public int hashCode() {
+		return type.hashCode() + 31 * typeToInstantiate.hashCode();
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj != null && obj instanceof KryoSerializer) {
+			KryoSerializer<?> other = (KryoSerializer<?>) obj;
+			return other.type == this.type && other.typeToInstantiate == this.typeToInstantiate;
+		} else {
+			return false;
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
 
 	private final void checkKryoInitialized() {
 		if (this.kryo == null) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index ed48c42..71f2cd8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -131,6 +131,28 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 	}
 
 	@Override
+	public T copy(T from) {
+		T target;
+		try {
+			target = clazz.newInstance();
+		}
+		catch (Throwable t) {
+			throw new RuntimeException("Cannot instantiate class.", t);
+		}
+		
+		try {
+			for (int i = 0; i < numFields; i++) {
+				Object copy = fieldSerializers[i].copy(fields[i].get(from));
+				fields[i].set(target, copy);
+			}
+		}
+		catch (IllegalAccessException e) {
+			throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields before.");
+		}
+		return target;
+	}
+	
+	@Override
 	public T copy(T from, T reuse) {
 		try {
 			for (int i = 0; i < numFields; i++) {
@@ -165,6 +187,28 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 	}
 
 	@Override
+	public T deserialize(DataInputView source) throws IOException {
+		T target;
+		try {
+			target = clazz.newInstance();
+		}
+		catch (Throwable t) {
+			throw new RuntimeException("Cannot instantiate class.", t);
+		}
+		
+		try {
+			for (int i = 0; i < numFields; i++) {
+				Object field = fieldSerializers[i].deserialize(source);
+				fields[i].set(target, field);
+			}
+		} catch (IllegalAccessException e) {
+			throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" +
+					"before.");
+		}
+		return target;
+	}
+	
+	@Override
 	public T deserialize(T reuse, DataInputView source) throws IOException {
 		try {
 			for (int i = 0; i < numFields; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
index 5d5c08f..12bec12 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java
@@ -31,7 +31,6 @@ public final class TupleSerializer<T extends Tuple> extends TupleSerializerBase<
 
 	private static final long serialVersionUID = 1L;
 	
-	@SuppressWarnings("unchecked")
 	public TupleSerializer(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers) {
 		super(tupleClass, fieldSerializers);
 	}
@@ -69,6 +68,11 @@ public final class TupleSerializer<T extends Tuple> extends TupleSerializerBase<
 	}
 
 	@Override
+	public T copy(T from) {
+		return copy(from, instantiateRaw());
+	}
+	
+	@Override
 	public T copy(T from, T reuse) {
 		for (int i = 0; i < arity; i++) {
 			Object copy = fieldSerializers[i].copy(from.getField(i), reuse.getField(i));
@@ -91,6 +95,16 @@ public final class TupleSerializer<T extends Tuple> extends TupleSerializerBase<
 	}
 
 	@Override
+	public T deserialize(DataInputView source) throws IOException {
+		T tuple = instantiateRaw();
+		for (int i = 0; i < arity; i++) {
+			Object field = fieldSerializers[i].deserialize(source);
+			tuple.setField(field, i);
+		}
+		return tuple;
+	}
+	
+	@Override
 	public T deserialize(T reuse, DataInputView source) throws IOException {
 		for (int i = 0; i < arity; i++) {
 			Object field = fieldSerializers[i].deserialize(reuse.getField(i), source);
@@ -98,4 +112,13 @@ public final class TupleSerializer<T extends Tuple> extends TupleSerializerBase<
 		}
 		return reuse;
 	}
+	
+	private T instantiateRaw() {
+		try {
+			return tupleClass.newInstance();
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Cannot instantiate tuple.", e);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
index db71b56..08df7d3 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -28,6 +28,8 @@ import java.util.Arrays;
 
 public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
 
+	private static final long serialVersionUID = 1L;
+
 	protected final Class<T> tupleClass;
 
 	protected final TypeSerializer<Object>[] fieldSerializers;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
index 8bea523..69a5ff6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
@@ -72,10 +72,15 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
 	}
 
 	@Override
+	public T copy(T from) {
+		checkKryoInitialized();
+		return this.kryo.copy(from);
+	}
+	
+	@Override
 	public T copy(T from, T reuse) {
 		checkKryoInitialized();
-		reuse = this.kryo.copy(from);
-		return reuse;
+		return this.kryo.copy(from);
 	}
 
 	@Override
@@ -89,6 +94,11 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
 	}
 
 	@Override
+	public T deserialize(DataInputView source) throws IOException {
+		return deserialize(createInstance(), source);
+	}
+	
+	@Override
 	public T deserialize(T reuse, DataInputView source) throws IOException {
 		reuse.read(source);
 		return reuse;
@@ -111,4 +121,21 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
 			this.kryo.register(type);
 		}
 	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public int hashCode() {
+		return this.type.hashCode() + 17;
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj.getClass() == ValueSerializer.class) {
+			ValueSerializer<?> other = (ValueSerializer<?>) obj;
+			return this.type == other.type;
+		} else {
+			return false;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
index 6fcf730..d5a0470 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
@@ -48,10 +48,15 @@ public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
 	}
 	
 	@Override
+	public T copy(T from) {
+		checkKryoInitialized();
+		return this.kryo.copy(from);
+	}
+	
+	@Override
 	public T copy(T from, T reuse) {
 		checkKryoInitialized();
-		reuse = this.kryo.copy(from);
-		return reuse;
+		return this.kryo.copy(from);
 	}
 	
 	@Override
@@ -65,6 +70,11 @@ public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
 	}
 	
 	@Override
+	public T deserialize(DataInputView source) throws IOException {
+		return deserialize(createInstance(), source);
+	}
+	
+	@Override
 	public T deserialize(T reuse, DataInputView source) throws IOException {
 		reuse.readFields(source);
 		return reuse;
@@ -102,4 +112,20 @@ public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
 			this.kryo.register(typeClass);
 		}
 	}
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public int hashCode() {
+		return this.typeClass.hashCode() + 177;
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj.getClass() == WritableSerializer.class) {
+			WritableSerializer<?> other = (WritableSerializer<?>) obj;
+			return this.typeClass == other.typeClass;
+		} else {
+			return false;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
index f3b63d3..2bc11f1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
@@ -16,24 +16,21 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.resettable;
 
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
-import org.junit.Assert;
-
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntValueSerializer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.resettable.SpillingResettableIterator;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.operators.testutils.types.IntValueSerializer;
 import org.apache.flink.types.IntValue;
+import org.junit.Assert;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
index 05b55da..2134bcd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.operators.testutils.types;
 
 import java.io.IOException;
+import java.util.Arrays;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
@@ -44,9 +45,14 @@ public class IntListSerializer extends TypeSerializer<IntList> {
 	}
 	
 	@Override
+	public IntList copy(IntList from) {
+		return new IntList(from.getKey(), Arrays.copyOf(from.getValue(), from.getValue().length));
+	}
+	
+	@Override
 	public IntList copy(IntList from, IntList reuse) {
 		reuse.setKey(from.getKey());
-		reuse.setValue(from.getValue());
+		reuse.setValue(Arrays.copyOf(from.getValue(), from.getValue().length));
 		return reuse;
 	}
 	
@@ -74,6 +80,11 @@ public class IntListSerializer extends TypeSerializer<IntList> {
 	}
 
 	@Override
+	public IntList deserialize(DataInputView source) throws IOException {
+		return deserialize(new IntList(), source);
+	}
+	
+	@Override
 	public IntList deserialize(IntList record, DataInputView source) throws IOException {
 		int key = source.readInt();
 		record.setKey(key);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
index 9533627..361585d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
@@ -46,6 +46,11 @@ public class IntPairSerializer extends TypeSerializer<IntPair> {
 	public IntPair createInstance() {
 		return new IntPair();
 	}
+	
+	@Override
+	public IntPair copy(IntPair from) {
+		return new IntPair(from.getKey(), from.getValue());
+	}
 
 	@Override
 	public IntPair copy(IntPair from, IntPair reuse) {
@@ -67,6 +72,11 @@ public class IntPairSerializer extends TypeSerializer<IntPair> {
 	}
 
 	@Override
+	public IntPair deserialize(DataInputView source) throws IOException {
+		return new IntPair(source.readInt(), source.readInt());
+	}
+	
+	@Override
 	public IntPair deserialize(IntPair reuse, DataInputView source) throws IOException {
 		reuse.setKey(source.readInt());
 		reuse.setValue(source.readInt());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntValueSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntValueSerializer.java
deleted file mode 100644
index ab24e92..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntValueSerializer.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.testutils.types;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.IntValue;
-
-
-public class IntValueSerializer extends TypeSerializer<IntValue> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public boolean isImmutableType() {
-		return false;
-	}
-
-	@Override
-	public boolean isStateful() {
-		return false;
-	}
-	@Override
-	public IntValue createInstance() {
-		return new IntValue();
-	}
-
-	@Override
-	public IntValue copy(IntValue from, IntValue reuse) {
-		reuse.setValue(from.getValue());
-		return reuse;
-	}
-
-
-	@Override
-	public int getLength() {
-		return 4;
-	}
-
-	@Override
-	public void serialize(IntValue record, DataOutputView target) throws IOException {
-		target.writeInt(record.getValue());
-	}
-
-	@Override
-	public IntValue deserialize(IntValue reuse, DataInputView source) throws IOException {
-		reuse.setValue(source.readInt());
-		return reuse;
-	}
-
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		target.write(source, 4);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
index 8ba46c2..a38633c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
@@ -45,6 +45,10 @@ public class StringPairSerializer extends TypeSerializer<StringPair> {
 	}
 	
 	@Override
+	public StringPair copy(StringPair from) {
+		return new StringPair(from.getKey(), from.getValue());
+	}
+	@Override
 	public StringPair copy(StringPair from, StringPair reuse) {
 		reuse.setKey(from.getKey());
 		reuse.setValue(from.getValue());
@@ -63,6 +67,11 @@ public class StringPairSerializer extends TypeSerializer<StringPair> {
 	}
 
 	@Override
+	public StringPair deserialize(DataInputView source) throws IOException {
+		return new StringPair(StringValue.readString(source), StringValue.readString(source));
+	}
+	
+	@Override
 	public StringPair deserialize(StringPair record, DataInputView source) throws IOException {
 		record.setKey(StringValue.readString(source));
 		record.setValue(StringValue.readString(source));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
index b452b12..f9cd10c 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
@@ -31,8 +31,11 @@ abstract class CaseClassSerializer[T <: Product](
     scalaFieldSerializers: Array[TypeSerializer[_]])
   extends TupleSerializerBase[T](clazz, scalaFieldSerializers) {
 
+  @transient var fields : Array[AnyRef] = _
+  
+  
   def createInstance: T = {
-    val fields: Array[AnyRef] = new Array(arity)
+    initArray()
     for (i <- 0 until arity) {
       fields(i) = fieldSerializers(i).createInstance()
     }
@@ -40,7 +43,11 @@ abstract class CaseClassSerializer[T <: Product](
   }
 
   def copy(from: T, reuse: T): T = {
-    val fields: Array[AnyRef] = new Array(arity)
+    copy(from)
+  }
+  
+  def copy(from: T): T = {
+    initArray()
     for (i <- 0 until arity) {
       fields(i) = from.productElement(i).asInstanceOf[AnyRef]
     }
@@ -55,11 +62,25 @@ abstract class CaseClassSerializer[T <: Product](
   }
 
   def deserialize(reuse: T, source: DataInputView): T = {
-    val fields: Array[AnyRef] = new Array(arity)
+    initArray()
     for (i <- 0 until arity) {
       val field = reuse.productElement(i).asInstanceOf[AnyRef]
       fields(i) = fieldSerializers(i).deserialize(field, source)
     }
     createInstance(fields)
   }
+  
+  def deserialize(source: DataInputView): T = {
+    initArray()
+    for (i <- 0 until arity) {
+      fields(i) = fieldSerializers(i).deserialize(source)
+    }
+    createInstance(fields)
+  }
+  
+  def initArray() = {
+    if (fields == null) {
+      fields = new Array[AnyRef](arity)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java
index fdfd873..822b4f2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java
@@ -20,11 +20,11 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
-public final class VertexWithAdjacencyListSerializer extends TypeSerializer<VertexWithAdjacencyList> {
+public final class VertexWithAdjacencyListSerializer extends TypeSerializerSingleton<VertexWithAdjacencyList> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -45,6 +45,14 @@ public final class VertexWithAdjacencyListSerializer extends TypeSerializer<Vert
 	}
 
 	@Override
+	public VertexWithAdjacencyList copy(VertexWithAdjacencyList from) {
+		VertexWithAdjacencyList copy = new VertexWithAdjacencyList(from.getVertexID(), new long[from.getNumTargets()]);
+		copy.setNumTargets(from.getNumTargets());
+		System.arraycopy(from.getTargets(), 0, copy.getTargets(), 0, from.getNumTargets());
+		return copy;
+	}
+	
+	@Override
 	public VertexWithAdjacencyList copy(VertexWithAdjacencyList from, VertexWithAdjacencyList reuse) {
 		if (reuse.getTargets().length < from.getTargets().length) {
 			reuse.setTargets(new long[from.getTargets().length]);
@@ -75,6 +83,11 @@ public final class VertexWithAdjacencyListSerializer extends TypeSerializer<Vert
 	}
 
 	@Override
+	public VertexWithAdjacencyList deserialize(DataInputView source) throws IOException {
+		return deserialize(new VertexWithAdjacencyList(), source);
+	}
+	
+	@Override
 	public VertexWithAdjacencyList deserialize(VertexWithAdjacencyList target, DataInputView source) throws IOException {
 		target.setVertexID(source.readLong());
 		
@@ -101,16 +114,4 @@ public final class VertexWithAdjacencyListSerializer extends TypeSerializer<Vert
 		target.writeInt(numTargets);
 		target.write(source, numTargets * 8);
 	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public int hashCode() {
-		return 3;
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		return obj != null && obj.getClass() == VertexWithAdjacencyListSerializer.class;
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java
index 5189203..e972cd1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java
@@ -20,11 +20,11 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
-public final class VertexWithRankAndDanglingSerializer extends TypeSerializer<VertexWithRankAndDangling> {
+public final class VertexWithRankAndDanglingSerializer extends TypeSerializerSingleton<VertexWithRankAndDangling> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -45,6 +45,11 @@ public final class VertexWithRankAndDanglingSerializer extends TypeSerializer<Ve
 	}
 
 	@Override
+	public VertexWithRankAndDangling copy(VertexWithRankAndDangling from) {
+		return new VertexWithRankAndDangling(from.getVertexID(), from.getRank(), from.isDangling());
+	}
+	
+	@Override
 	public VertexWithRankAndDangling copy(VertexWithRankAndDangling from, VertexWithRankAndDangling reuse) {
 		reuse.setVertexID(from.getVertexID());
 		reuse.setRank(from.getRank());
@@ -65,6 +70,11 @@ public final class VertexWithRankAndDanglingSerializer extends TypeSerializer<Ve
 	}
 
 	@Override
+	public VertexWithRankAndDangling deserialize(DataInputView source) throws IOException {
+		return new VertexWithRankAndDangling(source.readLong(), source.readDouble(), source.readBoolean());
+	}
+	
+	@Override
 	public VertexWithRankAndDangling deserialize(VertexWithRankAndDangling target, DataInputView source) throws IOException {
 		target.setVertexID(source.readLong());
 		target.setRank(source.readDouble());
@@ -76,16 +86,4 @@ public final class VertexWithRankAndDanglingSerializer extends TypeSerializer<Ve
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		target.write(source, 17);
 	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public int hashCode() {
-		return 2;
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		return obj != null && obj.getClass() == VertexWithRankAndDanglingSerializer.class;
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java
index 1065633..928d4f4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java
@@ -20,11 +20,11 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
-public final class VertexWithRankSerializer extends TypeSerializer<VertexWithRank> {
+public final class VertexWithRankSerializer extends TypeSerializerSingleton<VertexWithRank> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -43,6 +43,11 @@ public final class VertexWithRankSerializer extends TypeSerializer<VertexWithRan
 	public VertexWithRank createInstance() {
 		return new VertexWithRank();
 	}
+	
+	@Override
+	public VertexWithRank copy(VertexWithRank from) {
+		return new VertexWithRank(from.getVertexID(), from.getRank());
+	}
 
 	@Override
 	public VertexWithRank copy(VertexWithRank from, VertexWithRank reuse) {
@@ -63,6 +68,11 @@ public final class VertexWithRankSerializer extends TypeSerializer<VertexWithRan
 	}
 
 	@Override
+	public VertexWithRank deserialize(DataInputView source) throws IOException {
+		return new VertexWithRank(source.readLong(), source.readDouble());
+	}
+	
+	@Override
 	public VertexWithRank deserialize(VertexWithRank target, DataInputView source) throws IOException {
 		target.setVertexID(source.readLong());
 		target.setRank(source.readDouble());
@@ -73,16 +83,4 @@ public final class VertexWithRankSerializer extends TypeSerializer<VertexWithRan
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		target.write(source, 16);
 	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public int hashCode() {
-		return 1;
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		return obj != null && obj.getClass() == VertexWithRankSerializer.class;
-	}
 }


Mime
View raw message