flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/2] git commit: [FLINK-1005] Extend TypeSerializer interface to handle non-mutable object deserialization more efficiently.
Date Tue, 30 Sep 2014 14:57:15 GMT
[FLINK-1005] Extend TypeSerializer interface to handle non-mutable object deserialization more efficiently.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/76d4a75e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/76d4a75e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/76d4a75e

Branch: refs/heads/master
Commit: 76d4a75e823c31a899f2143fb6be185b90e55532
Parents: ea4c882
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Jul 7 19:39:24 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue Sep 30 15:10:38 2014 +0200

----------------------------------------------------------------------
 .../streamrecord/StreamRecordSerializer.java    | 22 +++++-
 .../api/common/typeutils/TypeSerializer.java    | 42 ++++++-----
 .../typeutils/base/BooleanSerializer.java       | 14 +++-
 .../typeutils/base/BooleanValueSerializer.java  | 15 +++-
 .../common/typeutils/base/ByteSerializer.java   | 15 +++-
 .../typeutils/base/ByteValueSerializer.java     | 13 +++-
 .../common/typeutils/base/CharSerializer.java   | 15 +++-
 .../typeutils/base/CharValueSerializer.java     | 14 +++-
 .../common/typeutils/base/DoubleSerializer.java | 16 +++--
 .../typeutils/base/DoubleValueSerializer.java   | 13 +++-
 .../common/typeutils/base/FloatSerializer.java  | 16 +++--
 .../typeutils/base/FloatValueSerializer.java    | 15 +++-
 .../typeutils/base/GenericArraySerializer.java  | 38 ++++++++--
 .../common/typeutils/base/IntSerializer.java    | 15 +++-
 .../typeutils/base/IntValueSerializer.java      | 13 +++-
 .../common/typeutils/base/LongSerializer.java   | 15 +++-
 .../typeutils/base/LongValueSerializer.java     | 13 +++-
 .../common/typeutils/base/ShortSerializer.java  | 15 +++-
 .../typeutils/base/ShortValueSerializer.java    | 13 +++-
 .../common/typeutils/base/StringSerializer.java | 15 +++-
 .../typeutils/base/StringValueSerializer.java   | 40 ++++++++++-
 .../typeutils/base/TypeSerializerSingleton.java | 38 ++++++++++
 .../array/BooleanPrimitiveArraySerializer.java  | 26 ++++---
 .../array/BytePrimitiveArraySerializer.java     | 27 ++++---
 .../array/CharPrimitiveArraySerializer.java     | 27 ++++---
 .../array/DoublePrimitiveArraySerializer.java   | 29 +++++---
 .../array/FloatPrimitiveArraySerializer.java    | 27 ++++---
 .../base/array/IntPrimitiveArraySerializer.java | 27 ++++---
 .../array/LongPrimitiveArraySerializer.java     | 29 +++++---
 .../array/ShortPrimitiveArraySerializer.java    | 27 ++++---
 .../base/array/StringArraySerializer.java       | 30 +++++---
 .../typeutils/record/RecordSerializer.java      | 11 ++-
 .../common/typeutils/SerializerTestBase.java    | 49 ++++++++++++-
 .../typeutils/SerializerTestInstance.java       |  4 +-
 .../base/BooleanValueSerializerTest.java        | 56 +++++++++++++++
 .../typeutils/base/ByteValueSerializerTest.java | 57 +++++++++++++++
 .../typeutils/base/CharValueSerializerTest.java | 56 +++++++++++++++
 .../base/DoubleValueSerializerTest.java         | 58 +++++++++++++++
 .../base/FloatValueSerializerTest.java          | 58 +++++++++++++++
 .../typeutils/base/IntValueSerializerTest.java  | 56 +++++++++++++++
 .../typeutils/base/LongValueSerializerTest.java | 56 +++++++++++++++
 .../base/ShortValueSerializerTest.java          | 57 +++++++++++++++
 .../base/StringValueSerializerTest.java         | 55 ++++++++++++++
 flink-core/src/test/resources/logback-test.xml  |  4 ++
 .../java/typeutils/runtime/AvroSerializer.java  | 35 ++++++++-
 .../runtime/CopyableValueSerializer.java        | 27 ++++++-
 .../java/typeutils/runtime/KryoSerializer.java  | 39 ++++++++--
 .../java/typeutils/runtime/PojoSerializer.java  | 44 ++++++++++++
 .../java/typeutils/runtime/TupleSerializer.java | 25 ++++++-
 .../typeutils/runtime/TupleSerializerBase.java  |  2 +
 .../java/typeutils/runtime/ValueSerializer.java | 31 +++++++-
 .../typeutils/runtime/WritableSerializer.java   | 30 +++++++-
 .../SpillingResettableIteratorTest.java         |  7 +-
 .../testutils/types/IntListSerializer.java      | 13 +++-
 .../testutils/types/IntPairSerializer.java      | 10 +++
 .../testutils/types/IntValueSerializer.java     | 75 --------------------
 .../testutils/types/StringPairSerializer.java   |  9 +++
 .../scala/typeutils/CaseClassSerializer.scala   | 27 ++++++-
 .../VertexWithAdjacencyListSerializer.java      | 29 ++++----
 .../VertexWithRankAndDanglingSerializer.java    | 26 ++++---
 .../types/VertexWithRankSerializer.java         | 26 ++++---
 61 files changed, 1405 insertions(+), 301 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
index 932bae0..85faa9e 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java
@@ -62,6 +62,15 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
 			throw new RuntimeException("Cannot instantiate StreamRecord.", e);
 		}
 	}
+	
+	@Override
+	public StreamRecord<T> copy(StreamRecord<T> from) {
+		StreamRecord<T> rec = new StreamRecord<T>();
+		rec.isTuple = from.isTuple;
+		rec.setId(from.getId().copy());
+		rec.setObject(typeSerializer.copy(from.getObject()));
+		return rec;
+	}
 
 	@Override
 	public StreamRecord<T> copy(StreamRecord<T> from, StreamRecord<T> reuse) {
@@ -81,10 +90,18 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
 		value.getId().write(target);
 		typeSerializer.serialize(value.getObject(), target);
 	}
+	
+	@Override
+	public StreamRecord<T> deserialize(DataInputView source) throws IOException {
+		StreamRecord<T> record = new StreamRecord<T>();
+		record.isTuple = this.isTuple;
+		record.getId().read(source);
+		record.setObject(typeSerializer.deserialize(source));
+		return record;
+	}
 
 	@Override
-	public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView source)
-			throws IOException {
+	public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView source) throws IOException {
 		reuse.getId().read(source);
 		reuse.setObject(typeSerializer.deserialize(reuse.getObject(), source));
 		return reuse;
@@ -94,5 +111,4 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
 		// Needs to be implemented
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 9be4a89..87d7e20 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -73,10 +73,21 @@ public abstract class TypeSerializer<T> implements Serializable {
 	public abstract T createInstance();
 
 	/**
-	 * Creates a copy from the given element, storing the copied result in the given reuse element if type is mutable.
+	 * Creates a deep copy of the given element in a new element.
 	 * 
 	 * @param from The element reuse be copied.
-	 * @param reuse The element to be reused.
+	 * @return A deep copy of the element.
+	 */
+	public abstract T copy(T from);
+	
+	/**
+	 * Creates a copy from the given element.
+	 * The method makes an attempt to store the copy in the given reuse element, if the type is mutable.
+	 * This is, however, not guaranteed.
+	 * 
+	 * @param from The element to be copied.
+	 * @param reuse The element to be reused. May or may not be used.
+	 * @return A deep copy of the element.
 	 */
 	public abstract T copy(T from, T reuse);
 	
@@ -103,10 +114,22 @@ public abstract class TypeSerializer<T> implements Serializable {
 	public abstract void serialize(T record, DataOutputView target) throws IOException;
 
 	/**
+	 * De-serializes a record from the given source input view.
+	 * 
+	 * @param source The input view from which to read the data.
+	 * @result The deserialized element.
+	 * 
+	 * @throws IOException Thrown, if the de-serialization encountered an I/O related error. Typically raised by the
+	 *                     input view, which may have an underlying I/O channel from which it reads.
+	 */
+	public abstract T deserialize(DataInputView source) throws IOException;
+	
+	/**
 	 * De-serializes a record from the given source input view into the given reuse record instance if mutable.
 	 * 
 	 * @param reuse The record instance into which to de-serialize the data.
 	 * @param source The input view from which to read the data.
+	 * @result The deserialized element.
 	 * 
 	 * @throws IOException Thrown, if the de-serialization encountered an I/O related error. Typically raised by the
 	 *                     input view, which may have an underlying I/O channel from which it reads.
@@ -126,19 +149,4 @@ public abstract class TypeSerializer<T> implements Serializable {
 	 * @throws IOException Thrown if any of the two views raises an exception.
 	 */
 	public abstract void copy(DataInputView source, DataOutputView target) throws IOException;
-	
-	// --------------------------------------------------------------------------------------------
-	//  Default Utilities: Hash code and equals are pre-defined for singleton serializers, where
-	//                     all instances are equal
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public int hashCode() {
-		return getClass().hashCode();
-	}
-	
-	@Override
-	public boolean equals(Object obj) {
-		return obj != null && obj.getClass() == this.getClass();
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
index 02a72c5..ecfb3c2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java
@@ -20,12 +20,10 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
-
-public class BooleanSerializer extends TypeSerializer<Boolean> {
+public final class BooleanSerializer extends TypeSerializerSingleton<Boolean> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -49,6 +47,11 @@ public class BooleanSerializer extends TypeSerializer<Boolean> {
 	}
 
 	@Override
+	public Boolean copy(Boolean from) {
+		return from;
+	}
+	
+	@Override
 	public Boolean copy(Boolean from, Boolean reuse) {
 		return from;
 	}
@@ -64,6 +67,11 @@ public class BooleanSerializer extends TypeSerializer<Boolean> {
 	}
 
 	@Override
+	public Boolean deserialize(DataInputView source) throws IOException {
+		return Boolean.valueOf(source.readBoolean());
+	}
+	
+	@Override
 	public Boolean deserialize(Boolean reuse, DataInputView source) throws IOException {
 		return Boolean.valueOf(source.readBoolean());
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
index ddab3bb..4795055 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.BooleanValue;
 
 
-public class BooleanValueSerializer extends TypeSerializer<BooleanValue> {
+public final class BooleanValueSerializer extends TypeSerializerSingleton<BooleanValue> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -49,6 +48,13 @@ public class BooleanValueSerializer extends TypeSerializer<BooleanValue> {
 	}
 
 	@Override
+	public BooleanValue copy(BooleanValue from) {
+		BooleanValue result = new BooleanValue();
+		result.setValue(from.getValue());
+		return result;
+	}
+	
+	@Override
 	public BooleanValue copy(BooleanValue from, BooleanValue reuse) {
 		reuse.setValue(from.getValue());
 		return reuse;
@@ -65,6 +71,11 @@ public class BooleanValueSerializer extends TypeSerializer<BooleanValue> {
 	}
 
 	@Override
+	public BooleanValue deserialize(DataInputView source) throws IOException {
+		return deserialize(new BooleanValue(), source);
+	}
+	
+	@Override
 	public BooleanValue deserialize(BooleanValue reuse, DataInputView source) throws IOException {
 		reuse.read(source);
 		return reuse;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
index df52c13..32f3edd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java
@@ -20,12 +20,11 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 
-public class ByteSerializer extends TypeSerializer<Byte> {
+public final class ByteSerializer extends TypeSerializerSingleton<Byte> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -50,6 +49,11 @@ public class ByteSerializer extends TypeSerializer<Byte> {
 	}
 
 	@Override
+	public Byte copy(Byte from) {
+		return from;
+	}
+	
+	@Override
 	public Byte copy(Byte from, Byte reuse) {
 		return from;
 	}
@@ -65,9 +69,14 @@ public class ByteSerializer extends TypeSerializer<Byte> {
 	}
 
 	@Override
-	public Byte deserialize(Byte reuse, DataInputView source) throws IOException {
+	public Byte deserialize(DataInputView source) throws IOException {
 		return Byte.valueOf(source.readByte());
 	}
+	
+	@Override
+	public Byte deserialize(Byte reuse, DataInputView source) throws IOException {
+		return deserialize(source);
+	}
 
 	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
index ab3df38..24cc98b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.ByteValue;
 
 
-public class ByteValueSerializer extends TypeSerializer<ByteValue> {
+public final class ByteValueSerializer extends TypeSerializerSingleton<ByteValue> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -49,6 +48,11 @@ public class ByteValueSerializer extends TypeSerializer<ByteValue> {
 	}
 
 	@Override
+	public ByteValue copy(ByteValue from) {
+		return copy(from, new ByteValue());
+	}
+	
+	@Override
 	public ByteValue copy(ByteValue from, ByteValue reuse) {
 		reuse.setValue(from.getValue());
 		return reuse;
@@ -65,6 +69,11 @@ public class ByteValueSerializer extends TypeSerializer<ByteValue> {
 	}
 
 	@Override
+	public ByteValue deserialize(DataInputView source) throws IOException {
+		return deserialize(new ByteValue(), source);
+	}
+	
+	@Override
 	public ByteValue deserialize(ByteValue reuse, DataInputView source) throws IOException {
 		reuse.read(source);
 		return reuse;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
index 4ef9a56..c46d3a0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java
@@ -20,12 +20,11 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 
-public class CharSerializer extends TypeSerializer<Character> {
+public final class CharSerializer extends TypeSerializerSingleton<Character> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -50,6 +49,11 @@ public class CharSerializer extends TypeSerializer<Character> {
 	}
 
 	@Override
+	public Character copy(Character from) {
+		return from;
+	}
+	
+	@Override
 	public Character copy(Character from, Character reuse) {
 		return from;
 	}
@@ -65,9 +69,14 @@ public class CharSerializer extends TypeSerializer<Character> {
 	}
 
 	@Override
-	public Character deserialize(Character reuse, DataInputView source) throws IOException {
+	public Character deserialize(DataInputView source) throws IOException {
 		return Character.valueOf(source.readChar());
 	}
+	
+	@Override
+	public Character deserialize(Character reuse, DataInputView source) throws IOException {
+		return deserialize(source);
+	}
 
 	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
index 4946743..71a8ef4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java
@@ -20,13 +20,11 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.CharValue;
 
-
-public class CharValueSerializer extends TypeSerializer<CharValue> {
+public class CharValueSerializer extends TypeSerializerSingleton<CharValue> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -47,6 +45,11 @@ public class CharValueSerializer extends TypeSerializer<CharValue> {
 	public CharValue createInstance() {
 		return new CharValue();
 	}
+	
+	@Override
+	public CharValue copy(CharValue from) {
+		return copy(from, new CharValue());
+	}
 
 	@Override
 	public CharValue copy(CharValue from, CharValue reuse) {
@@ -63,6 +66,11 @@ public class CharValueSerializer extends TypeSerializer<CharValue> {
 	public void serialize(CharValue record, DataOutputView target) throws IOException {
 		record.write(target);
 	}
+	
+	@Override
+	public CharValue deserialize(DataInputView source) throws IOException {
+		return deserialize(new CharValue(), source);
+	}
 
 	@Override
 	public CharValue deserialize(CharValue reuse, DataInputView source) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
index fc8f55d..8e09f7c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java
@@ -20,12 +20,10 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
-
-public class DoubleSerializer extends TypeSerializer<Double> {
+public final class DoubleSerializer extends TypeSerializerSingleton<Double> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -50,6 +48,11 @@ public class DoubleSerializer extends TypeSerializer<Double> {
 	}
 
 	@Override
+	public Double copy(Double from) {
+		return from;
+	}
+	
+	@Override
 	public Double copy(Double from, Double reuse) {
 		return from;
 	}
@@ -65,9 +68,14 @@ public class DoubleSerializer extends TypeSerializer<Double> {
 	}
 
 	@Override
-	public Double deserialize(Double reuse, DataInputView source) throws IOException {
+	public Double deserialize(DataInputView source) throws IOException {
 		return Double.valueOf(source.readDouble());
 	}
+	
+	@Override
+	public Double deserialize(Double reuse, DataInputView source) throws IOException {
+		return deserialize(source);
+	}
 
 	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
index a19f83e..f4c7f37 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.DoubleValue;
 
 
-public class DoubleValueSerializer extends TypeSerializer<DoubleValue> {
+public final class DoubleValueSerializer extends TypeSerializerSingleton<DoubleValue> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -49,6 +48,11 @@ public class DoubleValueSerializer extends TypeSerializer<DoubleValue> {
 	}
 
 	@Override
+	public DoubleValue copy(DoubleValue from) {
+		return copy(from, new DoubleValue());
+	}
+	
+	@Override
 	public DoubleValue copy(DoubleValue from, DoubleValue reuse) {
 		reuse.setValue(from.getValue());
 		return reuse;
@@ -65,6 +69,11 @@ public class DoubleValueSerializer extends TypeSerializer<DoubleValue> {
 	}
 
 	@Override
+	public DoubleValue deserialize(DataInputView source) throws IOException {
+		return deserialize(new DoubleValue(), source);
+	}
+	
+	@Override
 	public DoubleValue deserialize(DoubleValue reuse, DataInputView source) throws IOException {
 		reuse.read(source);
 		return reuse;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
index c0c7917..b1a46b0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java
@@ -20,12 +20,10 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
-
-public class FloatSerializer extends TypeSerializer<Float> {
+public final class FloatSerializer extends TypeSerializerSingleton<Float> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -50,6 +48,11 @@ public class FloatSerializer extends TypeSerializer<Float> {
 	}
 
 	@Override
+	public Float copy(Float from) {
+		return from;
+	}
+	
+	@Override
 	public Float copy(Float from, Float reuse) {
 		return from;
 	}
@@ -65,9 +68,14 @@ public class FloatSerializer extends TypeSerializer<Float> {
 	}
 
 	@Override
-	public Float deserialize(Float reuse, DataInputView source) throws IOException {
+	public Float deserialize(DataInputView source) throws IOException {
 		return Float.valueOf(source.readFloat());
 	}
+	
+	@Override
+	public Float deserialize(Float reuse, DataInputView source) throws IOException {
+		return deserialize(source);
+	}
 
 	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
index 5660dea..6ebb268 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.FloatValue;
 
 
-public class FloatValueSerializer extends TypeSerializer<FloatValue> {
+public class FloatValueSerializer extends TypeSerializerSingleton<FloatValue> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -49,6 +48,11 @@ public class FloatValueSerializer extends TypeSerializer<FloatValue> {
 	}
 
 	@Override
+	public FloatValue copy(FloatValue from) {
+		return copy(from, new FloatValue());
+	}
+	
+	@Override
 	public FloatValue copy(FloatValue from, FloatValue reuse) {
 		reuse.setValue(from.getValue());
 		return reuse;
@@ -65,6 +69,11 @@ public class FloatValueSerializer extends TypeSerializer<FloatValue> {
 	}
 
 	@Override
+	public FloatValue deserialize(DataInputView source) throws IOException {
+		return deserialize(new FloatValue(), source);
+	}
+	
+	@Override
 	public FloatValue deserialize(FloatValue reuse, DataInputView source) throws IOException {
 		reuse.read(source);
 		return reuse;
@@ -72,6 +81,6 @@ public class FloatValueSerializer extends TypeSerializer<FloatValue> {
 
 	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		target.writeDouble(source.readFloat());
+		target.writeFloat(source.readFloat());
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
index cfdb132..504b41b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
@@ -27,9 +27,11 @@ import org.apache.flink.core.memory.DataOutputView;
 
 
 /**
+ * A serializer for arrays of objects.
+ * 
  * @param <C> The component type
  */
-public class GenericArraySerializer<C> extends TypeSerializer<C[]> {
+public final class GenericArraySerializer<C> extends TypeSerializer<C[]> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -40,7 +42,6 @@ public class GenericArraySerializer<C> extends TypeSerializer<C[]> {
 	private final C[] EMPTY;
 	
 	
-	
 	public GenericArraySerializer(Class<C> componentClass, TypeSerializer<C> componentSerializer) {
 		if (componentClass == null || componentSerializer == null) {
 			throw new NullPointerException();
@@ -68,7 +69,7 @@ public class GenericArraySerializer<C> extends TypeSerializer<C[]> {
 	}
 
 	@Override
-	public C[] copy(C[] from, C[] reuse) {
+	public C[] copy(C[] from) {
 		C[] copy = create(from.length);
 
 		for (int i = 0; i < copy.length; i++) {
@@ -77,6 +78,11 @@ public class GenericArraySerializer<C> extends TypeSerializer<C[]> {
 
 		return copy;
 	}
+	
+	@Override
+	public C[] copy(C[] from, C[] reuse) {
+		return copy(from);
+	}
 
 	@Override
 	public int getLength() {
@@ -98,6 +104,24 @@ public class GenericArraySerializer<C> extends TypeSerializer<C[]> {
 	}
 
 	@Override
+	public C[] deserialize(DataInputView source) throws IOException {
+		int len = source.readInt();
+		
+		C[] array = create(len);
+		
+		for (int i = 0; i < len; i++) {
+			boolean isNonNull = source.readBoolean();
+			if (isNonNull) {
+				array[i] = componentSerializer.deserialize(source);
+			} else {
+				array[i] = null;
+			}
+		}
+		
+		return array;
+	}
+	
+	@Override
 	public C[] deserialize(C[] reuse, DataInputView source) throws IOException {
 		int len = source.readInt();
 		
@@ -108,7 +132,13 @@ public class GenericArraySerializer<C> extends TypeSerializer<C[]> {
 		for (int i = 0; i < len; i++) {
 			boolean isNonNull = source.readBoolean();
 			if (isNonNull) {
-				reuse[i] = componentSerializer.deserialize(componentSerializer.createInstance(), source);
+				C ri = reuse[i];
+				if (ri == null) {
+					ri = componentSerializer.deserialize(source);
+				} else {
+					ri = componentSerializer.deserialize(ri, source);
+				}
+				reuse[i] = ri;
 			} else {
 				reuse[i] = null;
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
index 28192cd..2937b2a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
@@ -20,12 +20,11 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 
-public class IntSerializer extends TypeSerializer<Integer> {
+public final class IntSerializer extends TypeSerializerSingleton<Integer> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -50,6 +49,11 @@ public class IntSerializer extends TypeSerializer<Integer> {
 	}
 
 	@Override
+	public Integer copy(Integer from) {
+		return from;
+	}
+	
+	@Override
 	public Integer copy(Integer from, Integer reuse) {
 		return from;
 	}
@@ -65,9 +69,14 @@ public class IntSerializer extends TypeSerializer<Integer> {
 	}
 
 	@Override
-	public Integer deserialize(Integer reuse, DataInputView source) throws IOException {
+	public Integer deserialize(DataInputView source) throws IOException {
 		return Integer.valueOf(source.readInt());
 	}
+	
+	@Override
+	public Integer deserialize(Integer reuse, DataInputView source) throws IOException {
+		return deserialize(source);
+	}
 
 	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
index 9cf72d9..ec1f345 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.IntValue;
 
 
-public class IntValueSerializer extends TypeSerializer<IntValue> {
+public final class IntValueSerializer extends TypeSerializerSingleton<IntValue> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -49,6 +48,11 @@ public class IntValueSerializer extends TypeSerializer<IntValue> {
 	}
 
 	@Override
+	public IntValue copy(IntValue from) {
+		return copy(from, new IntValue());
+	}
+	
+	@Override
 	public IntValue copy(IntValue from, IntValue reuse) {
 		reuse.setValue(from.getValue());
 		return reuse;
@@ -65,6 +69,11 @@ public class IntValueSerializer extends TypeSerializer<IntValue> {
 	}
 
 	@Override
+	public IntValue deserialize(DataInputView source) throws IOException {
+		return deserialize(new IntValue(), source);
+	}
+	
+	@Override
 	public IntValue deserialize(IntValue reuse, DataInputView source) throws IOException {
 		reuse.read(source);
 		return reuse;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
index 2ca2b84..6b25596 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java
@@ -20,12 +20,11 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 
-public class LongSerializer extends TypeSerializer<Long> {
+public final class LongSerializer extends TypeSerializerSingleton<Long> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -50,6 +49,11 @@ public class LongSerializer extends TypeSerializer<Long> {
 	}
 
 	@Override
+	public Long copy(Long from) {
+		return from;
+	}
+	
+	@Override
 	public Long copy(Long from, Long reuse) {
 		return from;
 	}
@@ -65,9 +69,14 @@ public class LongSerializer extends TypeSerializer<Long> {
 	}
 
 	@Override
-	public Long deserialize(Long reuse, DataInputView source) throws IOException {
+	public Long deserialize(DataInputView source) throws IOException {
 		return Long.valueOf(source.readLong());
 	}
+	
+	@Override
+	public Long deserialize(Long reuse, DataInputView source) throws IOException {
+		return deserialize(source);
+	}
 
 	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
index ea45dcc..95caf04 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.LongValue;
 
 
-public class LongValueSerializer extends TypeSerializer<LongValue> {
+public final class LongValueSerializer extends TypeSerializerSingleton<LongValue> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -49,6 +48,11 @@ public class LongValueSerializer extends TypeSerializer<LongValue> {
 	}
 
 	@Override
+	public LongValue copy(LongValue from) {
+		return copy(from, new LongValue());
+	}
+	
+	@Override
 	public LongValue copy(LongValue from, LongValue reuse) {
 		reuse.setValue(from.getValue());
 		return reuse;
@@ -65,6 +69,11 @@ public class LongValueSerializer extends TypeSerializer<LongValue> {
 	}
 
 	@Override
+	public LongValue deserialize(DataInputView source) throws IOException {
+		return deserialize(new LongValue(), source);
+	}
+	
+	@Override
 	public LongValue deserialize(LongValue reuse, DataInputView source) throws IOException {
 		reuse.read(source);
 		return reuse;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
index 156732c..c6e7870 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java
@@ -20,12 +20,11 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 
-public class ShortSerializer extends TypeSerializer<Short> {
+public final class ShortSerializer extends TypeSerializerSingleton<Short> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -50,6 +49,11 @@ public class ShortSerializer extends TypeSerializer<Short> {
 	}
 
 	@Override
+	public Short copy(Short from) {
+		return from;
+	}
+	
+	@Override
 	public Short copy(Short from, Short reuse) {
 		return from;
 	}
@@ -65,9 +69,14 @@ public class ShortSerializer extends TypeSerializer<Short> {
 	}
 
 	@Override
-	public Short deserialize(Short reuse, DataInputView source) throws IOException {
+	public Short deserialize(DataInputView source) throws IOException {
 		return Short.valueOf(source.readShort());
 	}
+	
+	@Override
+	public Short deserialize(Short reuse, DataInputView source) throws IOException {
+		return deserialize(source);
+	}
 
 	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
index ac46b46..ab58987 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.ShortValue;
 
 
-public class ShortValueSerializer extends TypeSerializer<ShortValue> {
+public final class ShortValueSerializer extends TypeSerializerSingleton<ShortValue> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -49,6 +48,11 @@ public class ShortValueSerializer extends TypeSerializer<ShortValue> {
 	}
 
 	@Override
+	public ShortValue copy(ShortValue from) {
+		return copy(from, new ShortValue());
+	}
+	
+	@Override
 	public ShortValue copy(ShortValue from, ShortValue reuse) {
 		reuse.setValue(from.getValue());
 		return reuse;
@@ -65,6 +69,11 @@ public class ShortValueSerializer extends TypeSerializer<ShortValue> {
 	}
 
 	@Override
+	public ShortValue deserialize(DataInputView source) throws IOException {
+		return deserialize(new ShortValue(), source);
+	}
+	
+	@Override
 	public ShortValue deserialize(ShortValue reuse, DataInputView source) throws IOException {
 		reuse.read(source);
 		return reuse;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
index d71b521..71221a2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.StringValue;
 
 
-public class StringSerializer extends TypeSerializer<String> {
+public final class StringSerializer extends TypeSerializerSingleton<String> {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -50,6 +49,11 @@ public class StringSerializer extends TypeSerializer<String> {
 	}
 
 	@Override
+	public String copy(String from) {
+		return from;
+	}
+	
+	@Override
 	public String copy(String from, String reuse) {
 		return from;
 	}
@@ -65,9 +69,14 @@ public class StringSerializer extends TypeSerializer<String> {
 	}
 
 	@Override
-	public String deserialize(String record, DataInputView source) throws IOException {
+	public String deserialize(DataInputView source) throws IOException {
 		return StringValue.readString(source);
 	}
+	
+	@Override
+	public String deserialize(String record, DataInputView source) throws IOException {
+		return deserialize(source);
+	}
 
 	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
index 352330f..c5d5b55 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java
@@ -20,16 +20,17 @@ package org.apache.flink.api.common.typeutils.base;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.StringValue;
 
 
-public class StringValueSerializer extends TypeSerializer<StringValue> {
+public final class StringValueSerializer extends TypeSerializerSingleton<StringValue> {
 
 	private static final long serialVersionUID = 1L;
 	
+	private static final int HIGH_BIT = 0x1 << 7;
+	
 	public static final StringValueSerializer INSTANCE = new StringValueSerializer();
 	
 	
@@ -49,6 +50,11 @@ public class StringValueSerializer extends TypeSerializer<StringValue> {
 	}
 
 	@Override
+	public StringValue copy(StringValue from) {
+		return copy(from, new StringValue());
+	}
+	
+	@Override
 	public StringValue copy(StringValue from, StringValue reuse) {
 		reuse.setValue(from);
 		return reuse;
@@ -65,6 +71,11 @@ public class StringValueSerializer extends TypeSerializer<StringValue> {
 	}
 
 	@Override
+	public StringValue deserialize(DataInputView source) throws IOException {
+		return deserialize(new StringValue(), source);
+	}
+	
+	@Override
 	public StringValue deserialize(StringValue reuse, DataInputView source) throws IOException {
 		reuse.read(source);
 		return reuse;
@@ -72,6 +83,29 @@ public class StringValueSerializer extends TypeSerializer<StringValue> {
 
 	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		StringValue.copyString(source, target);
+		int len = source.readUnsignedByte();
+		target.writeByte(len);
+
+		if (len >= HIGH_BIT) {
+			int shift = 7;
+			int curr;
+			len = len & 0x7f;
+			while ((curr = source.readUnsignedByte()) >= HIGH_BIT) {
+				target.writeByte(curr);
+				len |= (curr & 0x7f) << shift;
+				shift += 7;
+			}
+			target.writeByte(curr);
+			len |= curr << shift;
+		}
+
+		for (int i = 0; i < len; i++) {
+			int c = source.readUnsignedByte();
+			target.writeByte(c);
+			while (c >= HIGH_BIT) {
+				c = source.readUnsignedByte();
+				target.writeByte(c);
+			}
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
new file mode 100644
index 0000000..979d5ab
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public abstract class TypeSerializerSingleton<T> extends TypeSerializer<T>{
+
+	private static final long serialVersionUID = 8766687317209282373L;
+
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public int hashCode() {
+		return super.hashCode();
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		return obj != null && obj.getClass() == this.getClass();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
index 0093463..e9941a8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java
@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 /**
- * A serializer for long arrays.
+ * A serializer for boolean arrays.
  */
-public class BooleanPrimitiveArraySerializer extends TypeSerializer<boolean[]>{
+public final class BooleanPrimitiveArraySerializer extends TypeSerializerSingleton<boolean[]>{
 
 	private static final long serialVersionUID = 1L;
 	
@@ -52,13 +52,18 @@ public class BooleanPrimitiveArraySerializer extends TypeSerializer<boolean[]>{
 	}
 
 	@Override
-	public boolean[] copy(boolean[] from, boolean[] reuse) {
+	public boolean[] copy(boolean[] from) {
 		boolean[] copy = new boolean[from.length];
 		System.arraycopy(from, 0, copy, 0, from.length);
 		return copy;
 	}
 
 	@Override
+	public boolean[] copy(boolean[] from, boolean[] reuse) {
+		return copy(from);
+	}
+
+	@Override
 	public int getLength() {
 		return -1;
 	}
@@ -79,15 +84,20 @@ public class BooleanPrimitiveArraySerializer extends TypeSerializer<boolean[]>{
 
 
 	@Override
-	public boolean[] deserialize(boolean[] reuse, DataInputView source) throws IOException {
+	public boolean[] deserialize(DataInputView source) throws IOException {
 		final int len = source.readInt();
-		reuse = new boolean[len];
+		boolean[] result = new boolean[len];
 		
 		for (int i = 0; i < len; i++) {
-			reuse[i] = source.readBoolean();
+			result[i] = source.readBoolean();
 		}
 		
-		return reuse;
+		return result;
+	}
+	
+	@Override
+	public boolean[] deserialize(boolean[] reuse, DataInputView source) throws IOException {
+		return deserialize(source);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
index fa0638a..aaf867f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java
@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 /**
- * A serializer for long arrays.
+ * A serializer for byte arrays.
  */
-public class BytePrimitiveArraySerializer extends TypeSerializer<byte[]>{
+public final class BytePrimitiveArraySerializer extends TypeSerializerSingleton<byte[]>{
 
 	private static final long serialVersionUID = 1L;
 	
@@ -51,11 +51,16 @@ public class BytePrimitiveArraySerializer extends TypeSerializer<byte[]>{
 	}
 
 	@Override
-	public byte[] copy(byte[] from, byte[] reuse) {
+	public byte[] copy(byte[] from) {
 		byte[] copy = new byte[from.length];
 		System.arraycopy(from, 0, copy, 0, from.length);
 		return copy;
 	}
+	
+	@Override
+	public byte[] copy(byte[] from, byte[] reuse) {
+		return copy(from);
+	}
 
 	@Override
 	public int getLength() {
@@ -74,13 +79,17 @@ public class BytePrimitiveArraySerializer extends TypeSerializer<byte[]>{
 		target.write(record);
 	}
 
-
 	@Override
-	public byte[] deserialize(byte[] reuse, DataInputView source) throws IOException {
+	public byte[] deserialize(DataInputView source) throws IOException {
 		final int len = source.readInt();
-		reuse = new byte[len];
-		source.readFully(reuse);
-		return reuse;
+		byte[] result = new byte[len];
+		source.readFully(result);
+		return result;
+	}
+	
+	@Override
+	public byte[] deserialize(byte[] reuse, DataInputView source) throws IOException {
+		return deserialize(source);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
index 639d4b6..64632bd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java
@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 /**
- * A serializer for long arrays.
+ * A serializer for char arrays.
  */
-public class CharPrimitiveArraySerializer extends TypeSerializer<char[]>{
+public final class CharPrimitiveArraySerializer extends TypeSerializerSingleton<char[]>{
 
 	private static final long serialVersionUID = 1L;
 	
@@ -52,11 +52,16 @@ public class CharPrimitiveArraySerializer extends TypeSerializer<char[]>{
 	}
 
 	@Override
-	public char[] copy(char[] from, char[] reuse) {
+	public char[] copy(char[] from) {
 		char[] copy = new char[from.length];
 		System.arraycopy(from, 0, copy, 0, from.length);
 		return copy;
 	}
+	
+	@Override
+	public char[] copy(char[] from, char[] reuse) {
+		return copy(from);
+	}
 
 	@Override
 	public int getLength() {
@@ -77,17 +82,21 @@ public class CharPrimitiveArraySerializer extends TypeSerializer<char[]>{
 		}
 	}
 
-
 	@Override
-	public char[] deserialize(char[] reuse, DataInputView source) throws IOException {
+	public char[] deserialize(DataInputView source) throws IOException {
 		final int len = source.readInt();
-		reuse = new char[len];
+		char[] result = new char[len];
 		
 		for (int i = 0; i < len; i++) {
-			reuse[i] = source.readChar();
+			result[i] = source.readChar();
 		}
 		
-		return reuse;
+		return result;
+	}
+
+	@Override
+	public char[] deserialize(char[] reuse, DataInputView source) throws IOException {
+		return deserialize(source);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
index 2b089a3..846ae74 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java
@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 /**
- * A serializer for long arrays.
+ * A serializer for double arrays.
  */
-public class DoublePrimitiveArraySerializer extends TypeSerializer<double[]>{
+public final class DoublePrimitiveArraySerializer extends TypeSerializerSingleton<double[]>{
 
 	private static final long serialVersionUID = 1L;
 	
@@ -50,15 +50,20 @@ public class DoublePrimitiveArraySerializer extends TypeSerializer<double[]>{
 	public double[] createInstance() {
 		return EMPTY;
 	}
-
+	
 	@Override
-	public double[] copy(double[] from, double[] reuse) {
+	public double[] copy(double[] from) {
 		double[] copy = new double[from.length];
 		System.arraycopy(from, 0, copy, 0, from.length);
 		return copy;
 	}
 
 	@Override
+	public double[] copy(double[] from, double[] reuse) {
+		return copy(from);
+	}
+
+	@Override
 	public int getLength() {
 		return -1;
 	}
@@ -77,17 +82,21 @@ public class DoublePrimitiveArraySerializer extends TypeSerializer<double[]>{
 		}
 	}
 
-
 	@Override
-	public double[] deserialize(double[] reuse, DataInputView source) throws IOException {
+	public double[] deserialize(DataInputView source) throws IOException {
 		final int len = source.readInt();
-		reuse = new double[len];
+		double[] result = new double[len];
 		
 		for (int i = 0; i < len; i++) {
-			reuse[i] = source.readDouble();
+			result[i] = source.readDouble();
 		}
 		
-		return reuse;
+		return result;
+	}
+	
+	@Override
+	public double[] deserialize(double[] reuse, DataInputView source) throws IOException {
+		return deserialize(source);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
index 897292e..8f42ac8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java
@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 /**
- * A serializer for long arrays.
+ * A serializer for float arrays.
  */
-public class FloatPrimitiveArraySerializer extends TypeSerializer<float[]>{
+public final class FloatPrimitiveArraySerializer extends TypeSerializerSingleton<float[]>{
 
 	private static final long serialVersionUID = 1L;
 	
@@ -52,11 +52,16 @@ public class FloatPrimitiveArraySerializer extends TypeSerializer<float[]>{
 	}
 
 	@Override
-	public float[] copy(float[] from, float[] reuse) {
+	public float[] copy(float[] from) {
 		float[] copy = new float[from.length];
 		System.arraycopy(from, 0, copy, 0, from.length);
 		return copy;
 	}
+	
+	@Override
+	public float[] copy(float[] from, float[] reuse) {
+		return copy(from);
+	}
 
 	@Override
 	public int getLength() {
@@ -77,17 +82,21 @@ public class FloatPrimitiveArraySerializer extends TypeSerializer<float[]>{
 		}
 	}
 
-
 	@Override
-	public float[] deserialize(float[] reuse, DataInputView source) throws IOException {
+	public float[] deserialize(DataInputView source) throws IOException {
 		final int len = source.readInt();
-		reuse = new float[len];
+		float[] result = new float[len];
 		
 		for (int i = 0; i < len; i++) {
-			reuse[i] = source.readFloat();
+			result[i] = source.readFloat();
 		}
 		
-		return reuse;
+		return result;
+	}
+	
+	@Override
+	public float[] deserialize(float[] reuse, DataInputView source) throws IOException {
+		return deserialize(source);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
index aeaf35e..2ab056c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java
@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 /**
- * A serializer for long arrays.
+ * A serializer for int arrays.
  */
-public class IntPrimitiveArraySerializer extends TypeSerializer<int[]>{
+public class IntPrimitiveArraySerializer extends TypeSerializerSingleton<int[]>{
 
 	private static final long serialVersionUID = 1L;
 	
@@ -52,11 +52,16 @@ public class IntPrimitiveArraySerializer extends TypeSerializer<int[]>{
 	}
 
 	@Override
-	public int[] copy(int[] from, int[] reuse) {
+	public int[] copy(int[] from) {
 		int[] copy = new int[from.length];
 		System.arraycopy(from, 0, copy, 0, from.length);
 		return copy;
 	}
+	
+	@Override
+	public int[] copy(int[] from, int[] reuse) {
+		return copy(from);
+	}
 
 	@Override
 	public int getLength() {
@@ -77,17 +82,21 @@ public class IntPrimitiveArraySerializer extends TypeSerializer<int[]>{
 		}
 	}
 
-
 	@Override
-	public int[] deserialize(int[] reuse, DataInputView source) throws IOException {
+	public int[] deserialize(DataInputView source) throws IOException {
 		final int len = source.readInt();
-		reuse = new int[len];
+		int[] result = new int[len];
 		
 		for (int i = 0; i < len; i++) {
-			reuse[i] = source.readInt();
+			result[i] = source.readInt();
 		}
 		
-		return reuse;
+		return result;
+	}
+	
+	@Override
+	public int[] deserialize(int[] reuse, DataInputView source) throws IOException {
+		return deserialize(source);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
index 953e8ff..5d34dfe 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java
@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 /**
  * A serializer for long arrays.
  */
-public class LongPrimitiveArraySerializer extends TypeSerializer<long[]>{
+public final class LongPrimitiveArraySerializer extends TypeSerializerSingleton<long[]>{
 
 	private static final long serialVersionUID = 1L;
 	
@@ -52,10 +52,15 @@ public class LongPrimitiveArraySerializer extends TypeSerializer<long[]>{
 	}
 
 	@Override
+	public long[] copy(long[] from) {
+		long[] result = new long[from.length];
+		System.arraycopy(from, 0, result, 0, from.length);
+		return result;
+	}
+	
+	@Override
 	public long[] copy(long[] from, long[] reuse) {
-		reuse = new long[from.length];
-		System.arraycopy(from, 0, reuse, 0, from.length);
-		return reuse;
+		return copy(from);
 	}
 
 	@Override
@@ -77,17 +82,21 @@ public class LongPrimitiveArraySerializer extends TypeSerializer<long[]>{
 		}
 	}
 
-
 	@Override
-	public long[] deserialize(long[] reuse, DataInputView source) throws IOException {
+	public long[] deserialize(DataInputView source) throws IOException {
 		final int len = source.readInt();
-		reuse = new long[len];
+		long[] array = new long[len];
 		
 		for (int i = 0; i < len; i++) {
-			reuse[i] = source.readLong();
+			array[i] = source.readLong();
 		}
 		
-		return reuse;
+		return array;
+	}
+	
+	@Override
+	public long[] deserialize(long[] reuse, DataInputView source) throws IOException {
+		return deserialize(source);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
index 014fd05..2f37033 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java
@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 /**
- * A serializer for long arrays.
+ * A serializer for short arrays.
  */
-public class ShortPrimitiveArraySerializer extends TypeSerializer<short[]>{
+public final class ShortPrimitiveArraySerializer extends TypeSerializerSingleton<short[]>{
 
 	private static final long serialVersionUID = 1L;
 	
@@ -52,11 +52,16 @@ public class ShortPrimitiveArraySerializer extends TypeSerializer<short[]>{
 	}
 
 	@Override
-	public short[] copy(short[] from, short[] reuse) {
+	public short[] copy(short[] from) {
 		short[] copy = new short[from.length];
 		System.arraycopy(from, 0, copy, 0, from.length);
 		return copy;
 	}
+	
+	@Override
+	public short[] copy(short[] from, short[] reuse) {
+		return copy(from);
+	}
 
 	@Override
 	public int getLength() {
@@ -77,17 +82,21 @@ public class ShortPrimitiveArraySerializer extends TypeSerializer<short[]>{
 		}
 	}
 
-
 	@Override
-	public short[] deserialize(short[] reuse, DataInputView source) throws IOException {
+	public short[] deserialize(DataInputView source) throws IOException {
 		final int len = source.readInt();
-		reuse = new short[len];
+		short[] array = new short[len];
 		
 		for (int i = 0; i < len; i++) {
-			reuse[i] = source.readShort();
+			array[i] = source.readShort();
 		}
 		
-		return reuse;
+		return array;
+	}
+
+	@Override
+	public short[] deserialize(short[] reuse, DataInputView source) throws IOException {
+		return deserialize(source);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
index c6c1826..d5ab030 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java
@@ -20,7 +20,7 @@ package org.apache.flink.api.common.typeutils.base.array;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.StringValue;
@@ -29,7 +29,7 @@ import org.apache.flink.types.StringValue;
 /**
  * A serializer for String arrays. Specialized for efficiency.
  */
-public class StringArraySerializer extends TypeSerializer<String[]>{
+public final class StringArraySerializer extends TypeSerializerSingleton<String[]>{
 
 	private static final long serialVersionUID = 1L;
 	
@@ -54,10 +54,15 @@ public class StringArraySerializer extends TypeSerializer<String[]>{
 	}
 
 	@Override
+	public String[] copy(String[] from) {
+		String[] target = new String[from.length];
+		System.arraycopy(from, 0, target, 0, from.length);
+		return target;
+	}
+	
+	@Override
 	public String[] copy(String[] from, String[] reuse) {
-		reuse = new String[from.length];
-		System.arraycopy(from, 0, reuse, 0, from.length);
-		return reuse;
+		return copy(from);
 	}
 
 	@Override
@@ -65,7 +70,6 @@ public class StringArraySerializer extends TypeSerializer<String[]>{
 		return -1;
 	}
 
-
 	@Override
 	public void serialize(String[] record, DataOutputView target) throws IOException {
 		if (record == null) {
@@ -79,17 +83,21 @@ public class StringArraySerializer extends TypeSerializer<String[]>{
 		}
 	}
 
-
 	@Override
-	public String[] deserialize(String[] reuse, DataInputView source) throws IOException {
+	public String[] deserialize(DataInputView source) throws IOException {
 		final int len = source.readInt();
-		reuse = new String[len];
+		String[] array = new String[len];
 		
 		for (int i = 0; i < len; i++) {
-			reuse[i] = StringValue.readString(source);
+			array[i] = StringValue.readString(source);
 		}
 		
-		return reuse;
+		return array;
+	}
+	
+	@Override
+	public String[] deserialize(String[] reuse, DataInputView source) throws IOException {
+		return deserialize(source);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java
index 7e8762e..7b72e89 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java
@@ -67,12 +67,16 @@ public final class RecordSerializer extends TypeSerializer<Record> {
 	}
 
 	@Override
+	public Record copy(Record from) {
+		return from.createCopy();
+	}
+	
+	@Override
 	public Record copy(Record from, Record reuse) {
 		from.copyTo(reuse);
 		return reuse;
 	}
 	
-
 	@Override
 	public int getLength() {
 		return -1;
@@ -86,6 +90,11 @@ public final class RecordSerializer extends TypeSerializer<Record> {
 	}
 
 	@Override
+	public Record deserialize(DataInputView source) throws IOException {
+		return deserialize(new Record(), source);
+	}
+	
+	@Override
 	public Record deserialize(Record target, DataInputView source) throws IOException {
 		target.deserialize(source);
 		return target;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index 6ca3f6c..d509284 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -89,6 +89,24 @@ public abstract class SerializerTestBase<T> {
 	}
 	
 	@Test
+	public void testCopy() {
+		try {
+			TypeSerializer<T> serializer = getSerializer();
+			T[] testData = getData();
+			
+			for (T datum : testData) {
+				T copy = serializer.copy(datum);
+				deepEquals("Copied element is not equal to the original element.", datum, copy);
+			}
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Exception in test: " + e.getMessage());
+		}
+	}
+	
+	@Test
 	public void testCopyIntoNewElements() {
 		try {
 			TypeSerializer<T> serializer = getSerializer();
@@ -184,7 +202,36 @@ public abstract class SerializerTestBase<T> {
 	}
 	
 	@Test
-	public void testSerializeAsSequence() {
+	public void testSerializeAsSequenceNoReuse() {
+		try {
+			TypeSerializer<T> serializer = getSerializer();
+			T[] testData = getData();
+			
+			TestOutputView out = new TestOutputView();
+			for (T value : testData) {
+				serializer.serialize(value, out);
+			}
+			
+			TestInputView in = out.getInputView();
+			
+			int num = 0;
+			while (in.available() > 0) {
+				T deserialized = serializer.deserialize(in);
+				deepEquals("Deserialized value if wrong.", testData[num], deserialized);
+				num++;
+			}
+			
+			assertEquals("Wrong number of elements deserialized.", testData.length, num);
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Exception in test: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSerializeAsSequenceReusingValues() {
 		try {
 			TypeSerializer<T> serializer = getSerializer();
 			T[] testData = getData();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
index 2eb52c0..5b63633 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestInstance.java
@@ -69,11 +69,13 @@ public class
 	public void testAll() {
 		testInstantiate();
 		testGetLength();
+		testCopy();
 		testCopyIntoNewElements();
 		testCopyIntoReusedElements();
 		testSerializeIndividually();
 		testSerializeIndividuallyReusingValues();
-		testSerializeAsSequence();
+		testSerializeAsSequenceNoReuse();
+		testSerializeAsSequenceReusingValues();
 		testSerializedCopyIndividually();
 		testSerializedCopyAsSequence();
 		testSerializabilityAndEquals();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializerTest.java
new file mode 100644
index 0000000..43f1a57
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializerTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.BooleanValue;
+
+/**
+ * A test for the {@link BooleanValueSerializer}.
+ */
+public class BooleanValueSerializerTest extends SerializerTestBase<BooleanValue> {
+	
+	@Override
+	protected TypeSerializer<BooleanValue> createSerializer() {
+		return new BooleanValueSerializer();
+	}
+	
+	@Override
+	protected int getLength() {
+		return 1;
+	}
+	
+	@Override
+	protected Class<BooleanValue> getTypeClass() {
+		return BooleanValue.class;
+	}
+	
+	@Override
+	protected BooleanValue[] getTestData() {
+		Random rnd = new Random(874597969123412341L);
+		
+		return new BooleanValue[] {new BooleanValue(true), new BooleanValue(false),
+								new BooleanValue(rnd.nextBoolean()),
+								new BooleanValue(rnd.nextBoolean()),
+								new BooleanValue(rnd.nextBoolean())};
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializerTest.java
new file mode 100644
index 0000000..0e16629
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializerTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.ByteValue;
+
+/**
+ * A test for the {@link ByteValueSerializer}.
+ */
+public class ByteValueSerializerTest extends SerializerTestBase<ByteValue> {
+	
+	@Override
+	protected TypeSerializer<ByteValue> createSerializer() {
+		return new ByteValueSerializer();
+	}
+	
+	@Override
+	protected int getLength() {
+		return 1;
+	}
+	
+	@Override
+	protected Class<ByteValue> getTypeClass() {
+		return ByteValue.class;
+	}
+	
+	@Override
+	protected ByteValue[] getTestData() {
+		Random rnd = new Random(874597969123412341L);
+		byte byteArray[] = new byte[1];
+		rnd.nextBytes(byteArray);
+		
+		return new ByteValue[] {new ByteValue((byte) 0), new ByteValue((byte) 1), new ByteValue((byte) -1), 
+							new ByteValue(Byte.MAX_VALUE), new ByteValue(Byte.MIN_VALUE),
+							new ByteValue(byteArray[0]), new ByteValue((byte) -byteArray[0])};
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/CharValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/CharValueSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/CharValueSerializerTest.java
new file mode 100644
index 0000000..ac83666
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/CharValueSerializerTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.CharValue;
+
+/**
+ * A test for the {@link CharValueSerializer}.
+ */
+public class CharValueSerializerTest extends SerializerTestBase<CharValue> {
+	
+	@Override
+	protected TypeSerializer<CharValue> createSerializer() {
+		return new CharValueSerializer();
+	}
+	
+	@Override
+	protected int getLength() {
+		return 2;
+	}
+	
+	@Override
+	protected Class<CharValue> getTypeClass() {
+		return CharValue.class;
+	}
+	
+	@Override
+	protected CharValue[] getTestData() {
+		Random rnd = new Random(874597969123412341L);
+		int rndInt = rnd.nextInt((int) Character.MAX_VALUE);
+		
+		return new CharValue[] {new CharValue('a'), new CharValue('@'), new CharValue('รค'),
+								new CharValue('1'), new CharValue((char) rndInt),
+								new CharValue(Character.MAX_VALUE), new CharValue(Character.MIN_VALUE)};
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/76d4a75e/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializerTest.java
new file mode 100644
index 0000000..c0a2b24
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializerTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.util.Random;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.DoubleValue;
+
+/**
+ * A test for the {@link DoubleValueSerializer}.
+ */
+public class DoubleValueSerializerTest extends SerializerTestBase<DoubleValue> {
+	
+	@Override
+	protected TypeSerializer<DoubleValue> createSerializer() {
+		return new DoubleValueSerializer();
+	}
+	
+	@Override
+	protected int getLength() {
+		return 8;
+	}
+	
+	@Override
+	protected Class<DoubleValue> getTypeClass() {
+		return DoubleValue.class;
+	}
+	
+	@Override
+	protected DoubleValue[] getTestData() {
+		Random rnd = new Random(874597969123412341L);
+		Double rndDouble = rnd.nextDouble() * Double.MAX_VALUE;
+		
+		return new DoubleValue[] {new DoubleValue(0), new DoubleValue(1), new DoubleValue(-1),
+							new DoubleValue(Double.MAX_VALUE), new DoubleValue(Double.MIN_VALUE),
+							new DoubleValue(rndDouble), new DoubleValue(-rndDouble),
+							new DoubleValue(Double.NaN),
+							new DoubleValue(Double.NEGATIVE_INFINITY), new DoubleValue(Double.POSITIVE_INFINITY)};
+	}
+}	


Mime
View raw message