flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject flink git commit: [hotfix] Remove 'ByteArrayInputView' and replace deserialization in TypeInformationSerializationSchema with more efficient reusable buffers.
Date Mon, 01 Feb 2016 16:30:04 GMT
Repository: flink
Updated Branches:
  refs/heads/master 67b380d61 -> 92efcd34a


[hotfix] Remove 'ByteArrayInputView' and replace deserialization in TypeInformationSerializationSchema
with more efficient reusable buffers.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/92efcd34
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/92efcd34
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/92efcd34

Branch: refs/heads/master
Commit: 92efcd34a5da2bccb07666f2c647974ea3e7c94f
Parents: 67b380d
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Feb 1 14:39:24 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Feb 1 17:29:02 2016 +0100

----------------------------------------------------------------------
 .../typeutils/runtime/ByteArrayInputView.java   | 40 ----------------
 .../runtime/kryo/KryoClearedBufferTest.java     |  8 +++-
 .../runtime/util/DataInputDeserializer.java     | 48 ++++++++++++--------
 .../runtime/util/DataOutputSerializer.java      | 25 ++++++----
 ...eInformationKeyValueSerializationSchema.java | 44 +++++++++++++-----
 .../connectors/kafka/KafkaConsumerTestBase.java | 17 +++++--
 .../TypeInformationSerializationSchema.java     | 14 ++++--
 .../TypeInformationSerializationSchemaTest.java |  2 +-
 8 files changed, 109 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/92efcd34/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ByteArrayInputView.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ByteArrayInputView.java
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ByteArrayInputView.java
deleted file mode 100644
index 48d6a3d..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ByteArrayInputView.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.core.memory.DataInputView;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-
-public class ByteArrayInputView extends DataInputStream implements DataInputView {
-
-	public ByteArrayInputView(byte[] data) {
-		super(new ByteArrayInputStream(data));
-	}
-
-	@Override
-	public void skipBytesToRead(int numBytes) throws IOException {
-		while (numBytes > 0) {
-			int skipped = skipBytes(numBytes);
-			numBytes -= skipped;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/92efcd34/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
index ab2e45f..7572408 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java
@@ -22,13 +22,16 @@ import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
+
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
 import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.Serializable;
@@ -69,7 +72,8 @@ public class KryoClearedBufferTest {
 			// now the Kryo Output should have been cleared
 		}
 
-		TestRecord actualRecord = kryoSerializer.deserialize(new ByteArrayInputView(target.getBuffer()));
+		TestRecord actualRecord = kryoSerializer.deserialize(
+				new DataInputViewStreamWrapper(new ByteArrayInputStream(target.getBuffer())));
 
 		Assert.assertEquals(testRecord, actualRecord);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/92efcd34/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
index e8e8f6d..bdccdd1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.util;
 
 import java.io.EOFException;
@@ -31,7 +30,11 @@ import org.apache.flink.core.memory.MemoryUtils;
 /**
  * A simple and efficient deserializer for the {@link java.io.DataInput} interface.
  */
-public class DataInputDeserializer implements DataInputView {
+public class DataInputDeserializer implements DataInputView, java.io.Serializable {
+	
+	private static final long serialVersionUID = 1L;
+
+	// ------------------------------------------------------------------------
 	
 	private byte[] buffer;
 	
@@ -39,8 +42,9 @@ public class DataInputDeserializer implements DataInputView {
 
 	private int position;
 
-	public DataInputDeserializer() {
-	}
+	// ------------------------------------------------------------------------
+	
+	public DataInputDeserializer() {}
 	
 	public DataInputDeserializer(byte[] buffer, int start, int len) {
 		setBuffer(buffer, start, len);
@@ -50,6 +54,10 @@ public class DataInputDeserializer implements DataInputView {
 		setBuffer(buffer);
 	}
 
+	// ------------------------------------------------------------------------
+	//  Chaning buffers
+	// ------------------------------------------------------------------------
+	
 	public void setBuffer(ByteBuffer buffer) {
 		if (buffer.hasArray()) {
 			this.buffer = buffer.array();
@@ -311,44 +319,36 @@ public class DataInputDeserializer implements DataInputView {
 			return n;
 		}
 	}
-	
-	@SuppressWarnings("restriction")
-	private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
-	
-	@SuppressWarnings("restriction")
-	private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
-	
-	private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
 
 	@Override
 	public void skipBytesToRead(int numBytes) throws IOException {
 		int skippedBytes = skipBytes(numBytes);
 
-		if(skippedBytes < numBytes){
+		if (skippedBytes < numBytes){
 			throw new EOFException("Could not skip " + numBytes +" bytes.");
 		}
 	}
 
 	@Override
 	public int read(byte[] b, int off, int len) throws IOException {
-		if(b == null){
+		if (b == null){
 			throw new NullPointerException("Byte array b cannot be null.");
 		}
 
-		if(off < 0){
+		if (off < 0){
 			throw new IndexOutOfBoundsException("Offset cannot be negative.");
 		}
 
-		if(len < 0){
+		if (len < 0){
 			throw new IndexOutOfBoundsException("Length cannot be negative.");
 		}
 
-		if(b.length - off < len){
+		if (b.length - off < len){
 			throw new IndexOutOfBoundsException("Byte array does not provide enough space to store
requested data" +
 					".");
 		}
 
-		if(this.position >= this.end) {
+		if (this.position >= this.end) {
 			return -1;
 		} else {
 			int toRead = Math.min(this.end-this.position, len);
@@ -363,4 +363,16 @@ public class DataInputDeserializer implements DataInputView {
 	public int read(byte[] b) throws IOException {
 		return read(b, 0, b.length);
 	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	@SuppressWarnings("restriction")
+	private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
+
+	@SuppressWarnings("restriction")
+	private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
+
+	private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/92efcd34/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
index 0e93544..18940ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.util;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemoryUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,6 +40,8 @@ public class DataOutputSerializer implements DataOutputView {
 	private static final Logger LOG = LoggerFactory.getLogger(DataOutputSerializer.class);
 	
 	private static final int PRUNE_BUFFER_THRESHOLD = 5 * 1024 * 1024;
+
+	// ------------------------------------------------------------------------
 	
 	private final byte[] startBuffer;
 	
@@ -47,6 +50,8 @@ public class DataOutputSerializer implements DataOutputView {
 	private int position;
 
 	private ByteBuffer wrapper;
+
+	// ------------------------------------------------------------------------
 	
 	public DataOutputSerializer(int startSize) {
 		if (startSize < 1) {
@@ -303,14 +308,6 @@ public class DataOutputSerializer implements DataOutputView {
 		this.buffer = nb;
 		this.wrapper = ByteBuffer.wrap(this.buffer);
 	}
-	
-	@SuppressWarnings("restriction")
-	private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
-	
-	@SuppressWarnings("restriction")
-	private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
-	
-	private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
 
 	@Override
 	public void skipBytesToWrite(int numBytes) throws IOException {
@@ -330,4 +327,16 @@ public class DataOutputSerializer implements DataOutputView {
 		source.read(this.buffer, this.position, numBytes);
 		this.position += numBytes;
 	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	@SuppressWarnings("restriction")
+	private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
+
+	@SuppressWarnings("restriction")
+	private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
+
+	private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/92efcd34/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
index a35c01e..a3f8ba1 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
+import org.apache.flink.runtime.util.DataInputDeserializer;
 import org.apache.flink.runtime.util.DataOutputSerializer;
 
 import java.io.IOException;
@@ -46,10 +46,16 @@ public class TypeInformationKeyValueSerializationSchema<K, V> implements
KeyedDe
 	/** The serializer for the value */
 	private final TypeSerializer<V> valueSerializer;
 
-	/** reusable output serialization buffers */
+	/** reusable input deserialization buffer */
+	private final DataInputDeserializer inputDeserializer;
+	
+	/** reusable output serialization buffer for the key */
 	private transient DataOutputSerializer keyOutputSerializer;
-	private transient DataOutputSerializer valueOutputSerializer;
 
+	/** reusable output serialization buffer for the value */
+	private transient DataOutputSerializer valueOutputSerializer;
+	
+	
 	/** The type information, to be returned by {@link #getProducedType()}. It is
 	 * transient, because it is not serializable. Note that this means that the type information
 	 * is not available at runtime, but only prior to the first serialization / deserialization
*/
@@ -68,11 +74,22 @@ public class TypeInformationKeyValueSerializationSchema<K, V> implements
KeyedDe
 		this.typeInfo = new TupleTypeInfo<>(keyTypeInfo, valueTypeInfo);
 		this.keySerializer = keyTypeInfo.createSerializer(ec);
 		this.valueSerializer = valueTypeInfo.createSerializer(ec);
+		this.inputDeserializer = new DataInputDeserializer();
 	}
 
+	/**
+	 * Creates a new de-/serialization schema for the given types. This constructor accepts
the types
+	 * as classes and internally constructs the type information from the classes.
+	 * 
+	 * <p>If the types are parametrized and cannot be fully defined via classes, use the
constructor
+	 * that accepts {@link TypeInformation} instead.
+	 * 
+	 * @param keyClass The class of the key de-/serialized by this schema.
+	 * @param valueClass The class of the value de-/serialized by this schema.
+	 * @param config The execution config, which is used to parametrize the type serializers.
+	 */
 	public TypeInformationKeyValueSerializationSchema(Class<K> keyClass, Class<V>
valueClass, ExecutionConfig config) {
-		//noinspection unchecked
-		this( (TypeInformation<K>) TypeExtractor.createTypeInfo(keyClass), (TypeInformation<V>)
TypeExtractor.createTypeInfo(valueClass), config);
+		this(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass),
config);
 	}
 
 	// ------------------------------------------------------------------------
@@ -81,12 +98,15 @@ public class TypeInformationKeyValueSerializationSchema<K, V> implements
KeyedDe
 	@Override
 	public Tuple2<K, V> deserialize(byte[] messageKey, byte[] message, String topic, int
partition, long offset) throws IOException {
 		K key = null;
-		if(messageKey != null) {
-			key = keySerializer.deserialize(new ByteArrayInputView(messageKey));
-		}
 		V value = null;
-		if(message != null) {
-			value = valueSerializer.deserialize(new ByteArrayInputView(message));
+		
+		if (messageKey != null) {
+			inputDeserializer.setBuffer(messageKey, 0, messageKey.length);
+			key = keySerializer.deserialize(inputDeserializer);
+		}
+		if (message != null) {
+			inputDeserializer.setBuffer(message, 0, message.length);
+			value = valueSerializer.deserialize(inputDeserializer);
 		}
 		return new Tuple2<>(key, value);
 	}
@@ -104,7 +124,7 @@ public class TypeInformationKeyValueSerializationSchema<K, V> implements
KeyedDe
 
 	@Override
 	public byte[] serializeKey(Tuple2<K, V> element) {
-		if(element.f0 == null) {
+		if (element.f0 == null) {
 			return null;
 		} else {
 			// key is not null. serialize it:
@@ -132,7 +152,7 @@ public class TypeInformationKeyValueSerializationSchema<K, V> implements
KeyedDe
 	@Override
 	public byte[] serializeValue(Tuple2<K, V> element) {
 		// if the value is null, its serialized value is null as well.
-		if(element.f1 == null) {
+		if (element.f1 == null) {
 			return null;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/92efcd34/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 8592182..3d39869 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -39,9 +39,10 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
@@ -82,6 +83,7 @@ import org.junit.Assert;
 
 import org.junit.Rule;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
@@ -734,14 +736,16 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 	private static class Tuple2WithTopicDeserializationSchema implements KeyedDeserializationSchema<Tuple3<Integer,
Integer, String>> {
 
-		TypeSerializer ts;
+		private final TypeSerializer<Tuple2<Integer, Integer>> ts;
+		
 		public Tuple2WithTopicDeserializationSchema(ExecutionConfig ec) {
-			ts = TypeInfoParser.parse("Tuple2<Integer, Integer>").createSerializer(ec);
+			ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
 		}
 
 		@Override
 		public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message,
String topic, int partition, long offset) throws IOException {
-			Tuple2<Integer, Integer> t2 = (Tuple2<Integer, Integer>) ts.deserialize(new
ByteArrayInputView(message));
+			DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
+			Tuple2<Integer, Integer> t2 = ts.deserialize(in);
 			return new Tuple3<>(t2.f0, t2.f1, topic);
 		}
 
@@ -1103,8 +1107,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 	}
 
 	public static class FixedNumberDeserializationSchema implements DeserializationSchema<Tuple2<Integer,
Integer>> {
+		
 		final int finalCount;
 		int count = 0;
+		
 		TypeInformation<Tuple2<Integer, Integer>> ti = TypeInfoParser.parse("Tuple2<Integer,
Integer>");
 		TypeSerializer<Tuple2<Integer, Integer>> ser = ti.createSerializer(new ExecutionConfig());
 
@@ -1114,7 +1120,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 		@Override
 		public Tuple2<Integer, Integer> deserialize(byte[] message) throws IOException {
-			return ser.deserialize(new ByteArrayInputView(message));
+			DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
+			return ser.deserialize(in);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/92efcd34/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
index 6577be8..61876e4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.util.serialization;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
+import org.apache.flink.runtime.util.DataInputDeserializer;
 import org.apache.flink.runtime.util.DataOutputSerializer;
 
 import java.io.IOException;
@@ -29,7 +29,6 @@ import java.io.IOException;
 /**
  * A serialization and deserialization schema that uses Flink's serialization stack to
  * transform typed from and to byte arrays.
- *
  * 
  * @param <T> The type to be serialized.
  */
@@ -42,6 +41,9 @@ public class TypeInformationSerializationSchema<T> implements DeserializationSch
 
 	/** The reusable output serialization buffer */
 	private transient DataOutputSerializer dos;
+	
+	/** The reusable input deserialization buffer */
+	private transient DataInputDeserializer dis;
 
 	/** The type information, to be returned by {@link #getProducedType()}. It is
 	 * transient, because it is not serializable. Note that this means that the type information
@@ -65,8 +67,14 @@ public class TypeInformationSerializationSchema<T> implements DeserializationSch
 	
 	@Override
 	public T deserialize(byte[] message) {
+		if (dis != null) {
+			dis.setBuffer(message, 0, message.length);
+		} else {
+			dis = new DataInputDeserializer(message, 0, message.length);
+		}
+		
 		try {
-			return serializer.deserialize(new ByteArrayInputView(message));
+			return serializer.deserialize(dis);
 		}
 		catch (IOException e) {
 			throw new RuntimeException("Unable to deserialize message", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/92efcd34/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
index 1c0f850..e722f53 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
@@ -112,7 +112,7 @@ public class TypeInformationSerializationSchemaTest {
 
 		@Override
 		public String toString() {
-			return String.format("MyPOJO " + aField + " " + aList);
+			return "MyPOJO " + aField + " " + aList;
 		}
 	}
 }


Mime
View raw message