flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/2] flink git commit: [FLINK-3171] [misc] Consolidate zoo of wrapper classes for input/output-stream to data-input/output-view.
Date Tue, 15 Dec 2015 10:51:16 GMT
[FLINK-3171] [misc] Consolidate zoo of wrapper classes for input/output-stream to data-input/output-view.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d9a061c6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d9a061c6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d9a061c6

Branch: refs/heads/master
Commit: d9a061c674a0e64e6ec419ff614347ecc08d4eb6
Parents: 066913e
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Dec 14 11:21:57 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Dec 14 20:55:12 2015 +0100

----------------------------------------------------------------------
 .../accumulators/SerializedListAccumulator.java |  12 +-
 .../flink/api/common/io/BinaryInputFormat.java  |  38 +++--
 .../flink/api/common/io/BinaryOutputFormat.java |  48 +++---
 .../core/memory/DataInputViewStreamWrapper.java |   3 +
 .../memory/DataOutputViewStreamWrapper.java     |   3 +
 .../memory/InputViewDataInputStreamWrapper.java | 136 ----------------
 .../InputViewObjectInputStreamWrapper.java      | 126 ---------------
 .../OutputViewDataOutputStreamWrapper.java      | 124 ---------------
 .../OutputViewObjectOutputStreamWrapper.java    | 113 --------------
 .../apache/flink/util/InstantiationUtil.java    |  29 +---
 .../api/common/io/SequentialFormatTestBase.java |  26 ++--
 .../SimpleDataDistributionTest.java             |  53 +++----
 .../flink/core/testutils/CommonTestUtils.java   |  15 +-
 .../flink/types/CollectionsDataTypeTest.java    |  57 ++++---
 .../flink/types/PrimitiveDataTypeTest.java      | 101 ++++++------
 .../org/apache/flink/types/RecordITCase.java    |  23 ++-
 .../java/org/apache/flink/types/RecordTest.java | 154 ++++++++-----------
 .../types/StringValueSerializationTest.java     |  28 ++--
 .../api/java/io/CollectionInputFormat.java      |   9 +-
 ...llingAdaptiveSpanningRecordDeserializer.java |  13 +-
 .../event/IterationEventWithAggregators.java    |  23 ++-
 .../runtime/operators/util/TaskConfig.java      |  33 ++--
 .../runtime/state/filesystem/FsHeapKvState.java |   6 +-
 .../state/filesystem/FsHeapKvStateSnapshot.java |   5 +-
 .../event/EventWithAggregatorsTest.java         |  22 +--
 .../operators/util/OutputEmitterTest.java       |  38 ++---
 .../runtime/testutils/ManagementTestUtils.java  | 142 -----------------
 .../functions/source/FromElementsFunction.java  |  10 +-
 .../windowing/FoldAllWindowFunction.java        |  15 +-
 .../functions/windowing/FoldWindowFunction.java |  16 +-
 .../api/operators/StreamGroupedFold.java        |  14 +-
 31 files changed, 355 insertions(+), 1080 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d9a061c6/flink-core/src/main/java/org/apache/flink/api/common/accumulators/SerializedListAccumulator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/SerializedListAccumulator.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/SerializedListAccumulator.java
index 65a8c39..41df3bf 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/SerializedListAccumulator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/SerializedListAccumulator.java
@@ -19,13 +19,11 @@
 package org.apache.flink.api.common.accumulators;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -53,9 +51,7 @@ public class SerializedListAccumulator<T> implements Accumulator<T, ArrayList<by
 	public void add(T value, TypeSerializer<T> serializer) throws IOException {
 		try {
 			ByteArrayOutputStream outStream = new ByteArrayOutputStream();
-			OutputViewDataOutputStreamWrapper out = 
-					new OutputViewDataOutputStreamWrapper(new DataOutputStream(outStream));
-			
+			DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(outStream);
 			serializer.serialize(value, out);
 			localValue.add(outStream.toByteArray());
 		}
@@ -93,7 +89,7 @@ public class SerializedListAccumulator<T> implements Accumulator<T, ArrayList<by
 		List<T> result = new ArrayList<T>(data.size());
 		for (byte[] bytes : data) {
 			ByteArrayInputStream inStream = new ByteArrayInputStream(bytes);
-			InputViewDataInputStreamWrapper in = new InputViewDataInputStreamWrapper(new DataInputStream(inStream));
+			DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(inStream);
 			T val = serializer.deserialize(in);
 			result.add(val);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9a061c6/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
index ad4f52a..46a5d58 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
@@ -18,15 +18,6 @@
 
 package org.apache.flink.api.common.io;
 
-import java.io.DataInputStream;
-import java.io.FilterInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.BlockLocation;
@@ -36,9 +27,18 @@ import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.util.StringUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
 /**
  * Base class for all input formats that use blocks of fixed size. The input splits are aligned to these blocks. Without
  * configuration, these block sizes equal the native block sizes of the HDFS.
@@ -63,9 +63,9 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T> {
 	 */
 	private long blockSize = NATIVE_BLOCK_SIZE;
 
-	private DataInputStream dataInputStream;
+	private transient DataInputViewStreamWrapper dataInputStream;
 
-	private BlockInfo blockInfo;
+	private transient BlockInfo blockInfo;
 
 	private long readRecords;
 
@@ -116,7 +116,7 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T> {
 			}
 		}
 
-		return inputSplits.toArray(new FileInputSplit[0]);
+		return inputSplits.toArray(new FileInputSplit[inputSplits.size()]);
 	}
 
 	protected List<FileStatus> getFiles() throws IOException {
@@ -213,9 +213,8 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T> {
 
 			FSDataInputStream fdis = file.getPath().getFileSystem().open(file.getPath(), blockInfo.getInfoSize());
 			fdis.seek(file.getLen() - blockInfo.getInfoSize());
-
-			DataInputStream input = new DataInputStream(fdis);
-			blockInfo.read(new InputViewDataInputStreamWrapper(input));
+			
+			blockInfo.read(new DataInputViewStreamWrapper(fdis));
 			totalCount += blockInfo.getAccumulatedRecordCount();
 		}
 
@@ -250,13 +249,12 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T> {
 		if (this.splitLength > this.blockInfo.getInfoSize()) {
 			// TODO: seek not supported by compressed streams. Will throw exception
 			this.stream.seek(this.splitStart + this.splitLength - this.blockInfo.getInfoSize());
-			DataInputStream infoStream = new DataInputStream(this.stream);
-			this.blockInfo.read(new InputViewDataInputStreamWrapper(infoStream));
+			this.blockInfo.read(new DataInputViewStreamWrapper(this.stream));
 		}
 
 		this.stream.seek(this.splitStart + this.blockInfo.getFirstRecordStart());
 		BlockBasedInput blockBasedInput = new BlockBasedInput(this.stream, (int) blockSize);
-		this.dataInputStream = new DataInputStream(blockBasedInput);
+		this.dataInputStream = new DataInputViewStreamWrapper(blockBasedInput);
 		this.readRecords = 0;
 	}
 
@@ -271,7 +269,7 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T> {
 			return null;
 		}
 		
-		record = this.deserialize(record, new InputViewDataInputStreamWrapper(this.dataInputStream));
+		record = this.deserialize(record, this.dataInputStream);
 		this.readRecords++;
 		return record;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9a061c6/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java
index 21ff372..9b4e1cf 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java
@@ -18,42 +18,46 @@
 
 package org.apache.flink.api.common.io;
 
-import java.io.DataOutputStream;
 import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
 
 public abstract class BinaryOutputFormat<T> extends FileOutputFormat<T> {
+	
 	private static final long serialVersionUID = 1L;
 	
-	/**
-	 * The config parameter which defines the fixed length of a record.
-	 */
+	/** The config parameter which defines the fixed length of a record. */
 	public static final String BLOCK_SIZE_PARAMETER_KEY = "output.block_size";
 
 	public static final long NATIVE_BLOCK_SIZE = Long.MIN_VALUE;
 
-	/**
-	 * The block size to use.
-	 */
+	/** The block size to use. */
 	private long blockSize = NATIVE_BLOCK_SIZE;
 
-	private DataOutputStream dataOutputStream;
-
-	private BlockBasedOutput blockBasedInput;
+	private transient BlockBasedOutput blockBasedOutput;
+	
+	private transient DataOutputViewStreamWrapper outView;
 
+	
 	@Override
 	public void close() throws IOException {
-		this.dataOutputStream.close();
-		super.close();
+		try {
+			DataOutputViewStreamWrapper o = this.outView;
+			if (o != null) {
+				o.close();
+			}
+		}
+		finally {
+			super.close();
+		}
 	}
 	
-	protected void complementBlockInfo(BlockInfo blockInfo) throws IOException {
-	}
+	protected void complementBlockInfo(BlockInfo blockInfo) {}
 
 	@Override
 	public void configure(Configuration parameters) {
@@ -80,16 +84,16 @@ public abstract class BinaryOutputFormat<T> extends FileOutputFormat<T> {
 		final long blockSize = this.blockSize == NATIVE_BLOCK_SIZE ?
 			this.outputFilePath.getFileSystem().getDefaultBlockSize() : this.blockSize;
 
-		this.blockBasedInput = new BlockBasedOutput(this.stream, (int) blockSize);
-		this.dataOutputStream = new DataOutputStream(this.blockBasedInput);
+		this.blockBasedOutput = new BlockBasedOutput(this.stream, (int) blockSize);
+		this.outView = new DataOutputViewStreamWrapper(this.blockBasedOutput);
 	}
 
 	protected abstract void serialize(T record, DataOutputView dataOutput) throws IOException;
 
 	@Override
 	public void writeRecord(T record) throws IOException {
-		this.blockBasedInput.startRecord();
-		this.serialize(record, new OutputViewDataOutputStreamWrapper(this.dataOutputStream));
+		this.blockBasedOutput.startRecord();
+		this.serialize(record, outView);
 	}
 
 	/**
@@ -111,11 +115,11 @@ public abstract class BinaryOutputFormat<T> extends FileOutputFormat<T> {
 
 		private BlockInfo blockInfo = BinaryOutputFormat.this.createBlockInfo();
 
-		private DataOutputStream headerStream;
+		private DataOutputView headerStream;
 
 		public BlockBasedOutput(OutputStream out, int blockSize) {
 			super(out);
-			this.headerStream = new DataOutputStream(out);
+			this.headerStream = new DataOutputViewStreamWrapper(out);
 			this.maxPayloadSize = blockSize - this.blockInfo.getInfoSize();
 		}
 
@@ -170,7 +174,7 @@ public abstract class BinaryOutputFormat<T> extends FileOutputFormat<T> {
 			this.blockInfo.setAccumulatedRecordCount(this.totalCount);
 			this.blockInfo.setFirstRecordStart(this.firstRecordStartPos == NO_RECORD ? 0 : this.firstRecordStartPos);
 			BinaryOutputFormat.this.complementBlockInfo(this.blockInfo);
-			this.blockInfo.write(new OutputViewDataOutputStreamWrapper(this.headerStream));
+			this.blockInfo.write(this.headerStream);
 			this.blockPos = 0;
 			this.blockCount = 0;
 			this.firstRecordStartPos = NO_RECORD;

http://git-wip-us.apache.org/repos/asf/flink/blob/d9a061c6/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStreamWrapper.java
index 80affea..d5664c9 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStreamWrapper.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataInputViewStreamWrapper.java
@@ -23,6 +23,9 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 
+/**
+ * Utility class that turns an {@link InputStream} into a {@link DataInputView}.
+ */
 public class DataInputViewStreamWrapper extends DataInputStream implements DataInputView {
 
 	public DataInputViewStreamWrapper(InputStream in) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d9a061c6/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStreamWrapper.java
index efcc17e..61ad54e 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStreamWrapper.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStreamWrapper.java
@@ -22,6 +22,9 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 
+/**
+ * Utility class that turns an {@link OutputStream} into a {@link DataOutputView}.
+ */
 public class DataOutputViewStreamWrapper extends DataOutputStream implements DataOutputView {
 
 	private byte[] tempBuffer;

http://git-wip-us.apache.org/repos/asf/flink/blob/d9a061c6/flink-core/src/main/java/org/apache/flink/core/memory/InputViewDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/InputViewDataInputStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/memory/InputViewDataInputStreamWrapper.java
deleted file mode 100644
index b4dffb1..0000000
--- a/flink-core/src/main/java/org/apache/flink/core/memory/InputViewDataInputStreamWrapper.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.core.memory;
-
-import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.IOException;
-
-public class InputViewDataInputStreamWrapper implements DataInputView, Closeable {
-	
-	private final DataInputStream in;
-
-	public InputViewDataInputStreamWrapper(DataInputStream in){
-		this.in = in;
-	}
-	
-	@Override
-	public void close() throws IOException {
-		in.close();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void skipBytesToRead(int numBytes) throws IOException {
-		int result = in.skipBytes(numBytes);
-
-		if(result != numBytes){
-			throw new EOFException("Could not skip " + numBytes + " bytes.");
-		}
-	}
-
-	@Override
-	public int read(byte[] b, int off, int len) throws IOException {
-		return in.read(b, off, len);
-	}
-
-	@Override
-	public int read(byte[] b) throws IOException {
-		return in.read(b);
-	}
-
-	@Override
-	public void readFully(byte[] b) throws IOException {
-		in.readFully(b);
-	}
-
-	@Override
-	public void readFully(byte[] b, int off, int len) throws IOException {
-		in.readFully(b, off, len);
-	}
-
-	@Override
-	public int skipBytes(int n) throws IOException {
-		return in.skipBytes(n);
-	}
-
-	@Override
-	public boolean readBoolean() throws IOException {
-		return in.readBoolean();
-	}
-
-	@Override
-	public byte readByte() throws IOException {
-		return in.readByte();
-	}
-
-	@Override
-	public int readUnsignedByte() throws IOException {
-		return in.readUnsignedByte();
-	}
-
-	@Override
-	public short readShort() throws IOException {
-		return in.readShort();
-	}
-
-	@Override
-	public int readUnsignedShort() throws IOException {
-		return in.readUnsignedShort();
-	}
-
-	@Override
-	public char readChar() throws IOException {
-		return in.readChar();
-	}
-
-	@Override
-	public int readInt() throws IOException {
-		return in.readInt();
-	}
-
-	@Override
-	public long readLong() throws IOException {
-		return in.readLong();
-	}
-
-	@Override
-	public float readFloat() throws IOException {
-		return in.readFloat();
-	}
-
-	@Override
-	public double readDouble() throws IOException {
-		return in.readDouble();
-	}
-	
-	@Override
-	@Deprecated
-	@SuppressWarnings("deprecation")
-	public String readLine() throws IOException {
-		return in.readLine();
-	}
-
-	@Override
-	public String readUTF() throws IOException {
-		return in.readUTF();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9a061c6/flink-core/src/main/java/org/apache/flink/core/memory/InputViewObjectInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/InputViewObjectInputStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/memory/InputViewObjectInputStreamWrapper.java
deleted file mode 100644
index b0b9c83..0000000
--- a/flink-core/src/main/java/org/apache/flink/core/memory/InputViewObjectInputStreamWrapper.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.core.memory;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-
-public class InputViewObjectInputStreamWrapper implements DataInputView {
-	private final ObjectInputStream in;
-
-	public InputViewObjectInputStreamWrapper(ObjectInputStream in){
-		this.in = in;
-	}
-
-	@Override
-	public void skipBytesToRead(int numBytes) throws IOException {
-		int skippedBytes = in.skipBytes(numBytes);
-
-		if(skippedBytes < numBytes){
-			throw new EOFException("Could not skip " + numBytes + " bytes.");
-		}
-	}
-
-	@Override
-	public int read(byte[] b, int off, int len) throws IOException {
-		return in.read(b, off, len);
-	}
-
-	@Override
-	public int read(byte[] b) throws IOException {
-		return in.read(b);
-	}
-
-	@Override
-	public void readFully(byte[] b) throws IOException {
-		in.readFully(b);
-	}
-
-	@Override
-	public void readFully(byte[] b, int off, int len) throws IOException {
-		in.readFully(b, off, len);
-	}
-
-	@Override
-	public int skipBytes(int n) throws IOException {
-		return in.skipBytes(n);
-	}
-
-	@Override
-	public boolean readBoolean() throws IOException {
-		return in.readBoolean();
-	}
-
-	@Override
-	public byte readByte() throws IOException {
-		return in.readByte();
-	}
-
-	@Override
-	public int readUnsignedByte() throws IOException {
-		return in.readUnsignedByte();
-	}
-
-	@Override
-	public short readShort() throws IOException {
-		return in.readShort();
-	}
-
-	@Override
-	public int readUnsignedShort() throws IOException {
-		return in.readUnsignedShort();
-	}
-
-	@Override
-	public char readChar() throws IOException {
-		return in.readChar();
-	}
-
-	@Override
-	public int readInt() throws IOException {
-		return in.readInt();
-	}
-
-	@Override
-	public long readLong() throws IOException {
-		return in.readLong();
-	}
-
-	@Override
-	public float readFloat() throws IOException {
-		return in.readFloat();
-	}
-
-	@Override
-	public double readDouble() throws IOException {
-		return in.readDouble();
-	}
-
-	@SuppressWarnings("deprecation")
-	@Override
-	public String readLine() throws IOException {
-		return in.readLine();
-	}
-
-	@Override
-	public String readUTF() throws IOException {
-		return in.readUTF();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9a061c6/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewDataOutputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewDataOutputStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewDataOutputStreamWrapper.java
deleted file mode 100644
index 3be5d8b..0000000
--- a/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewDataOutputStreamWrapper.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.core.memory;
-
-import java.io.Closeable;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-public class OutputViewDataOutputStreamWrapper implements DataOutputView, Closeable {
-	
-	private final DataOutputStream out;
-
-	public OutputViewDataOutputStreamWrapper(DataOutputStream out){
-		this.out = out;
-	}
-	
-	
-	public void flush() throws IOException {
-		out.flush();
-	}
-	
-	@Override
-	public void close() throws IOException {
-		out.close();
-	}
-
-	@Override
-	public void skipBytesToWrite(int numBytes) throws IOException {
-		out.write(new byte[numBytes]);
-	}
-
-	@Override
-	public void write(DataInputView source, int numBytes) throws IOException {
-		byte[] buffer = new byte[numBytes];
-		source.read(buffer);
-		out.write(buffer);
-	}
-
-	@Override
-	public void write(int b) throws IOException {
-		out.write(b);
-	}
-
-	@Override
-	public void write(byte[] b) throws IOException {
-		out.write(b);
-	}
-
-	@Override
-	public void write(byte[] b, int off, int len) throws IOException {
-		out.write(b, off, len);
-	}
-
-	@Override
-	public void writeBoolean(boolean v) throws IOException {
-		out.writeBoolean(v);
-	}
-
-	@Override
-	public void writeByte(int v) throws IOException {
-		out.writeByte(v);
-	}
-
-	@Override
-	public void writeShort(int v) throws IOException {
-		out.writeShort(v);
-	}
-
-	@Override
-	public void writeChar(int v) throws IOException {
-		out.writeChar(v);
-	}
-
-	@Override
-	public void writeInt(int v) throws IOException {
-		out.writeInt(v);
-	}
-
-	@Override
-	public void writeLong(long v) throws IOException {
-		out.writeLong(v);
-	}
-
-	@Override
-	public void writeFloat(float v) throws IOException {
-		out.writeFloat(v);
-	}
-
-	@Override
-	public void writeDouble(double v) throws IOException {
-		out.writeDouble(v);
-	}
-
-	@Override
-	public void writeBytes(String s) throws IOException {
-		out.writeBytes(s);
-	}
-
-	@Override
-	public void writeChars(String s) throws IOException {
-		out.writeChars(s);
-	}
-
-	@Override
-	public void writeUTF(String s) throws IOException {
-		out.writeUTF(s);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9a061c6/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewObjectOutputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewObjectOutputStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewObjectOutputStreamWrapper.java
deleted file mode 100644
index 49cc3a7..0000000
--- a/flink-core/src/main/java/org/apache/flink/core/memory/OutputViewObjectOutputStreamWrapper.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.core.memory;
-
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-
-public class OutputViewObjectOutputStreamWrapper implements DataOutputView {
-	
-	private final ObjectOutputStream out;
-
-	public OutputViewObjectOutputStreamWrapper(ObjectOutputStream out){
-		this.out = out;
-	}
-
-	@Override
-	public void skipBytesToWrite(int numBytes) throws IOException {
-		out.write(new byte[numBytes]);
-	}
-
-	@Override
-	public void write(DataInputView source, int numBytes) throws IOException {
-		byte[] buffer = new byte[numBytes];
-		source.readFully(buffer);
-		out.write(buffer);
-	}
-
-	@Override
-	public void write(int b) throws IOException {
-		out.write(b);
-	}
-
-	@Override
-	public void write(byte[] b) throws IOException {
-		out.write(b);
-	}
-
-	@Override
-	public void write(byte[] b, int off, int len) throws IOException {
-		out.write(b, off, len);
-	}
-
-	@Override
-	public void writeBoolean(boolean v) throws IOException {
-		out.writeBoolean(v);
-	}
-
-	@Override
-	public void writeByte(int v) throws IOException {
-		out.writeByte(v);
-	}
-
-	@Override
-	public void writeShort(int v) throws IOException {
-		out.writeShort(v);
-	}
-
-	@Override
-	public void writeChar(int v) throws IOException {
-		out.writeChar(v);
-	}
-
-	@Override
-	public void writeInt(int v) throws IOException {
-		out.writeInt(v);
-	}
-
-	@Override
-	public void writeLong(long v) throws IOException {
-		out.writeLong(v);
-	}
-
-	@Override
-	public void writeFloat(float v) throws IOException {
-		out.writeFloat(v);
-	}
-
-	@Override
-	public void writeDouble(double v) throws IOException {
-		out.writeDouble(v);
-	}
-
-	@Override
-	public void writeBytes(String s) throws IOException {
-		out.writeBytes(s);
-	}
-
-	@Override
-	public void writeChars(String s) throws IOException {
-		out.writeChars(s);
-	}
-
-	@Override
-	public void writeUTF(String s) throws IOException {
-		out.writeUTF(s);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9a061c6/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index b1ef35d..728cc26 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -20,13 +20,11 @@ package org.apache.flink.util;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
@@ -204,19 +202,8 @@ public final class InstantiationUtil {
 	 * @return True, if the class is a non-statically accessible inner class.
 	 */
 	public static boolean isNonStaticInnerClass(Class<?> clazz) {
-		if (clazz.getEnclosingClass() == null) {
-			// no inner class
-			return false;
-		} else {
-			// inner class
-			if (clazz.getDeclaringClass() != null) {
-				// named inner class
-				return !Modifier.isStatic(clazz.getModifiers());
-			} else {
-				// anonymous inner class
-				return true;
-			}
-		}
+		return clazz.getEnclosingClass() != null && 
+			(clazz.getDeclaringClass() == null || !Modifier.isStatic(clazz.getModifiers()));
 	}
 	
 	/**
@@ -269,10 +256,8 @@ public final class InstantiationUtil {
 		}
 
 		ByteArrayOutputStream bos = new ByteArrayOutputStream(64);
-		OutputViewDataOutputStreamWrapper outputViewWrapper = new OutputViewDataOutputStreamWrapper(new DataOutputStream(bos));
-
+		DataOutputViewStreamWrapper outputViewWrapper = new DataOutputViewStreamWrapper(bos);
 		serializer.serialize(record, outputViewWrapper);
-
 		return bos.toByteArray();
 	}
 
@@ -281,7 +266,7 @@ public final class InstantiationUtil {
 			throw new NullPointerException("Byte array to deserialize from must not be null.");
 		}
 
-		InputViewDataInputStreamWrapper inputViewWrapper = new InputViewDataInputStreamWrapper(new DataInputStream(new ByteArrayInputStream(buf)));
+		DataInputViewStreamWrapper inputViewWrapper = new DataInputViewStreamWrapper(new ByteArrayInputStream(buf));
 		return serializer.deserialize(inputViewWrapper);
 	}
 
@@ -290,7 +275,7 @@ public final class InstantiationUtil {
 			throw new NullPointerException("Byte array to deserialize from must not be null.");
 		}
 
-		InputViewDataInputStreamWrapper inputViewWrapper = new InputViewDataInputStreamWrapper(new DataInputStream(new ByteArrayInputStream(buf)));
+		DataInputViewStreamWrapper inputViewWrapper = new DataInputViewStreamWrapper(new ByteArrayInputStream(buf));
 		return serializer.deserialize(reuse, inputViewWrapper);
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/d9a061c6/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
index 91f80e8..2ff6fab 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
@@ -18,28 +18,28 @@
 
 package org.apache.flink.api.common.io;
 
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runners.Parameterized.Parameters;
 
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
 /**
  * Test base for {@link org.apache.flink.api.common.io.BinaryInputFormat} and {@link org.apache.flink.api.common.io.BinaryOutputFormat}.
  */
@@ -81,10 +81,10 @@ public abstract class SequentialFormatTestBase<T> extends TestLogger {
 		int recordIndex = 0;
 		for (int fileIndex = 0; fileIndex < this.parallelism; fileIndex++) {
 			ByteCounter byteCounter = new ByteCounter();
-			DataOutputStream out = new DataOutputStream(byteCounter);
+
 			for (int fileCount = 0; fileCount < this.getNumberOfTuplesPerFile(fileIndex); fileCount++, recordIndex++) {
-				writeRecord(this.getRecord(recordIndex), new OutputViewDataOutputStreamWrapper
-						(out));
+				writeRecord(this.getRecord(recordIndex), 
+					new DataOutputViewStreamWrapper(byteCounter));
 			}
 			this.rawDataSizes[fileIndex] = byteCounter.getLength();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9a061c6/flink-core/src/test/java/org/apache/flink/api/distributions/SimpleDataDistributionTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/distributions/SimpleDataDistributionTest.java b/flink-core/src/test/java/org/apache/flink/api/distributions/SimpleDataDistributionTest.java
index 6a70dfd..a58e6ab 100644
--- a/flink-core/src/test/java/org/apache/flink/api/distributions/SimpleDataDistributionTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/distributions/SimpleDataDistributionTest.java
@@ -16,26 +16,23 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.distributions;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.junit.Assert;
-
 import org.apache.flink.api.common.distributions.SimpleDistribution;
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.types.DoubleValue;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Key;
 import org.apache.flink.types.StringValue;
+
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
 public class SimpleDataDistributionTest {
 
 	@Test
@@ -43,7 +40,7 @@ public class SimpleDataDistributionTest {
 
 		// check correct data distribution
 		try {
-			SimpleDistribution dd = new SimpleDistribution(new Key[] {new IntValue(1), new IntValue(2), new IntValue(3)});
+			SimpleDistribution dd = new SimpleDistribution(new Key<?>[] {new IntValue(1), new IntValue(2), new IntValue(3)});
 			Assert.assertEquals(1, dd.getNumberOfFields());
 		}
 		catch (Throwable t) {
@@ -52,7 +49,7 @@ public class SimpleDataDistributionTest {
 		
 		// check incorrect key types
 		try {
-			new SimpleDistribution(new Key[] {new IntValue(1), new StringValue("ABC"), new IntValue(3)});
+			new SimpleDistribution(new Key<?>[] {new IntValue(1), new StringValue("ABC"), new IntValue(3)});
 			Assert.fail("Data distribution accepts inconsistent key types");
 		} catch(IllegalArgumentException iae) {
 			// do nothing
@@ -60,7 +57,7 @@ public class SimpleDataDistributionTest {
 		
 		// check inconsistent number of keys
 		try {
-			new SimpleDistribution(new Key[][] {{new IntValue(1)}, {new IntValue(2), new IntValue(2)}, {new IntValue(3)}});
+			new SimpleDistribution(new Key<?>[][] {{new IntValue(1)}, {new IntValue(2), new IntValue(2)}, {new IntValue(3)}});
 			Assert.fail("Data distribution accepts inconsistent many keys");
 		} catch(IllegalArgumentException iae) {
 			// do nothing
@@ -72,7 +69,7 @@ public class SimpleDataDistributionTest {
 		
 		// check correct data distribution
 		SimpleDistribution dd = new SimpleDistribution(
-				new Key[][] {{new IntValue(1), new StringValue("A"), new IntValue(1)}, 
+				new Key<?>[][] {{new IntValue(1), new StringValue("A"), new IntValue(1)}, 
 							{new IntValue(2), new StringValue("A"), new IntValue(1)}, 
 							{new IntValue(3), new StringValue("A"), new IntValue(1)}});
 		Assert.assertEquals(3, dd.getNumberOfFields());
@@ -80,7 +77,7 @@ public class SimpleDataDistributionTest {
 		// check inconsistent key types
 		try {
 			new SimpleDistribution( 
-					new Key[][] {{new IntValue(1), new StringValue("A"), new DoubleValue(1.3d)}, 
+					new Key<?>[][] {{new IntValue(1), new StringValue("A"), new DoubleValue(1.3d)}, 
 								{new IntValue(2), new StringValue("B"), new IntValue(1)}});
 			Assert.fail("Data distribution accepts incorrect key types");
 		} catch(IllegalArgumentException iae) {
@@ -89,8 +86,8 @@ public class SimpleDataDistributionTest {
 		
 		// check inconsistent number of keys
 		try {
-			dd = new SimpleDistribution(
-					new Key[][] {{new IntValue(1), new IntValue(2)}, 
+			new SimpleDistribution(
+					new Key<?>[][] {{new IntValue(1), new IntValue(2)}, 
 								{new IntValue(2), new IntValue(2)}, 
 								{new IntValue(3)}});
 			Assert.fail("Data distribution accepts bucket boundaries with inconsistent many keys");
@@ -104,7 +101,7 @@ public class SimpleDataDistributionTest {
 	public void testWriteRead() {
 		
 		SimpleDistribution ddWrite = new SimpleDistribution(
-				new Key[][] {{new IntValue(1), new StringValue("A"), new IntValue(1)}, 
+				new Key<?>[][] {{new IntValue(1), new StringValue("A"), new IntValue(1)}, 
 							{new IntValue(2), new StringValue("A"), new IntValue(1)}, 
 							{new IntValue(2), new StringValue("B"), new IntValue(4)},
 							{new IntValue(2), new StringValue("B"), new IntValue(3)},
@@ -112,9 +109,9 @@ public class SimpleDataDistributionTest {
 		Assert.assertEquals(3, ddWrite.getNumberOfFields());
 		
 		final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		final DataOutputStream dos = new DataOutputStream(baos);
+		
 		try {
-			ddWrite.write(new OutputViewDataOutputStreamWrapper(dos));
+			ddWrite.write(new DataOutputViewStreamWrapper(baos));
 		} catch (IOException e) {
 			Assert.fail("Error serializing the DataDistribution: " + e.getMessage());
 		}
@@ -122,12 +119,11 @@ public class SimpleDataDistributionTest {
 		byte[] seralizedDD = baos.toByteArray();
 		
 		final ByteArrayInputStream bais = new ByteArrayInputStream(seralizedDD);
-		final DataInputStream in = new DataInputStream(bais);
 		
 		SimpleDistribution ddRead = new SimpleDistribution();
 		
 		try {
-			ddRead.read(new InputViewDataInputStreamWrapper(in));
+			ddRead.read(new DataInputViewStreamWrapper(bais));
 		} catch (Exception ex) {
 			Assert.fail("The deserialization of the encoded data distribution caused an error");
 		}
@@ -149,7 +145,7 @@ public class SimpleDataDistributionTest {
 	public void testGetBucketBoundary() {
 		
 		SimpleDistribution dd = new SimpleDistribution(
-				new Key[][] {{new IntValue(1), new StringValue("A")}, 
+				new Key<?>[][] {{new IntValue(1), new StringValue("A")}, 
 							{new IntValue(2), new StringValue("B")}, 
 							{new IntValue(3), new StringValue("C")},
 							{new IntValue(4), new StringValue("D")},
@@ -202,32 +198,31 @@ public class SimpleDataDistributionTest {
 		Assert.assertTrue(((StringValue) boundRec[1]).getValue().equals("D"));
 		
 		try {
-			boundRec = dd.getBucketBoundary(0, 7);
+			dd.getBucketBoundary(0, 7);
 			Assert.fail();
 		} catch(IllegalArgumentException iae) {
 			// nothing to do
 		}
 		
 		try {
-			boundRec = dd.getBucketBoundary(3, 4);
+			dd.getBucketBoundary(3, 4);
 			Assert.fail();
 		} catch(IllegalArgumentException iae) {
 			// nothing to do
 		}
 		
 		try {
-			boundRec = dd.getBucketBoundary(-1, 4);
+			dd.getBucketBoundary(-1, 4);
 			Assert.fail();
 		} catch(IllegalArgumentException iae) {
 			// nothing to do
 		}
 		
 		try {
-			boundRec = dd.getBucketBoundary(0, 0);
+			dd.getBucketBoundary(0, 0);
 			Assert.fail();
 		} catch(IllegalArgumentException iae) {
 			// nothing to do
 		}
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d9a061c6/flink-core/src/test/java/org/apache/flink/core/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/testutils/CommonTestUtils.java b/flink-core/src/test/java/org/apache/flink/core/testutils/CommonTestUtils.java
index 4dbf04c..a330c5b 100644
--- a/flink-core/src/test/java/org/apache/flink/core/testutils/CommonTestUtils.java
+++ b/flink-core/src/test/java/org/apache/flink/core/testutils/CommonTestUtils.java
@@ -22,8 +22,6 @@ import static org.junit.Assert.fail;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.ObjectInputStream;
@@ -32,8 +30,8 @@ import java.io.ObjectOutputStream;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 
 /**
  * This class contains reusable utility methods for unit tests.
@@ -101,9 +99,7 @@ public class CommonTestUtils {
 	public static <T extends IOReadableWritable> T createCopyWritable(final T original) throws IOException {
 
 		final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		final DataOutputStream dos = new DataOutputStream(baos);
-
-		original.write(new OutputViewDataOutputStreamWrapper(dos));
+		original.write(new DataOutputViewStreamWrapper(baos));
 
 		final String className = original.getClass().getName();
 		if (className == null) {
@@ -134,10 +130,7 @@ public class CommonTestUtils {
 		}
 
 		final ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
-		final DataInputStream dis = new DataInputStream(bais);
-
-		copy.read(new InputViewDataInputStreamWrapper(dis));
-
+		copy.read(new DataInputViewStreamWrapper(bais));
 		return copy;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d9a061c6/flink-core/src/test/java/org/apache/flink/types/CollectionsDataTypeTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/CollectionsDataTypeTest.java b/flink-core/src/test/java/org/apache/flink/types/CollectionsDataTypeTest.java
index 5c81e4a..b61dd6e 100644
--- a/flink-core/src/test/java/org/apache/flink/types/CollectionsDataTypeTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/CollectionsDataTypeTest.java
@@ -16,11 +16,17 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.types;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
 import java.io.IOException;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
@@ -28,27 +34,17 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Map.Entry;
 
-import org.junit.Assert;
-
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.junit.Before;
-import org.junit.Test;
-
 public class CollectionsDataTypeTest {
-	private DataOutputStream out;
+	
+	private DataOutputView out;
 
-	private DataInputStream in;
+	private DataInputView in;
 
 	@Before
-	public void setup() {
-		try {
-			PipedInputStream input = new PipedInputStream(1000);
-			in = new DataInputStream(input);
-			out = new DataOutputStream(new PipedOutputStream(input));
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
+	public void setup() throws Exception {
+		PipedInputStream input = new PipedInputStream(1000);
+		in = new DataInputViewStreamWrapper(input);
+		out = new DataOutputViewStreamWrapper(new PipedOutputStream(input));
 	}
 
 	@Test
@@ -66,8 +62,8 @@ public class CollectionsDataTypeTest {
 		try {
 			NfIntStringPair mPairActual = new NfIntStringPair();
 
-			pair1.write(new OutputViewDataOutputStreamWrapper(out));
-			mPairActual.read(new InputViewDataInputStreamWrapper(in));
+			pair1.write(out);
+			mPairActual.read(in);
 
 			Assert.assertEquals(pair1, mPairActual);
 		} catch (IOException e) {
@@ -190,10 +186,10 @@ public class CollectionsDataTypeTest {
 		// now test data transfer
 		NfIntStringMap nMap = new NfIntStringMap();
 		try {
-			map0.write(new OutputViewDataOutputStreamWrapper(out));
-			nMap.read(new InputViewDataInputStreamWrapper(in));
+			map0.write(out);
+			nMap.read(in);
 		} catch (Exception e) {
-			Assert.assertTrue(false);
+			Assert.fail();
 		}
 		for (Entry<IntValue, StringValue> entry : map0.entrySet()) {
 			Assert.assertEquals(entry.getKey().getValue(), Integer.parseInt(entry.getValue().toString()));
@@ -213,15 +209,14 @@ public class CollectionsDataTypeTest {
 		list.add(new StringValue("Hello3!"));
 		list.add(new StringValue("Hello4!"));
 
-		Assert.assertTrue(list.equals(list));
-
 		// test data transfer
 		NfStringList mList2 = new NfStringList();
 		try {
-			list.write(new OutputViewDataOutputStreamWrapper(out));
-			mList2.read(new InputViewDataInputStreamWrapper(in));
-		} catch (Exception e) {
-			Assert.assertTrue(false);
+			list.write(out);
+			mList2.read(in);
+		}
+		catch (Exception e) {
+			Assert.fail();
 		}
 		Assert.assertTrue(list.equals(mList2));
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9a061c6/flink-core/src/test/java/org/apache/flink/types/PrimitiveDataTypeTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/PrimitiveDataTypeTest.java b/flink-core/src/test/java/org/apache/flink/types/PrimitiveDataTypeTest.java
index efeac4d..83e192c 100644
--- a/flink-core/src/test/java/org/apache/flink/types/PrimitiveDataTypeTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/PrimitiveDataTypeTest.java
@@ -16,41 +16,35 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.types;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 
 import org.junit.Assert;
-
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.StringValue;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+
 public class PrimitiveDataTypeTest {
 
-	private DataOutputStream mOut;
+	private PipedInputStream in;
+	private PipedOutputStream out;
 
-	private DataInputStream mIn;
+	private DataInputView mIn;
+	private DataOutputView mOut;
 
 	@Before
-	public void setup() {
-		try {
-			PipedInputStream input = new PipedInputStream(1000);
-			mIn = new DataInputStream(input);
-			mOut = new DataOutputStream(new PipedOutputStream(input));
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
+	public void setup() throws Exception {
+		in = new PipedInputStream(1000);
+		out = new PipedOutputStream(in);
+		mIn = new DataInputViewStreamWrapper(in);
+		mOut = new DataOutputViewStreamWrapper(out);
 	}
 
 	@Test
@@ -68,15 +62,15 @@ public class PrimitiveDataTypeTest {
 		Assert.assertEquals(int0.compareTo(int3), -1);
 		// test stream output and retrieval
 		try {
-			int0.write(new OutputViewDataOutputStreamWrapper(mOut));
-			int2.write(new OutputViewDataOutputStreamWrapper(mOut));
-			int3.write(new OutputViewDataOutputStreamWrapper(mOut));
+			int0.write(mOut);
+			int2.write(mOut);
+			int3.write(mOut);
 			IntValue int1n = new IntValue();
 			IntValue int2n = new IntValue();
 			IntValue int3n = new IntValue();
-			int1n.read(new InputViewDataInputStreamWrapper(mIn));
-			int2n.read(new InputViewDataInputStreamWrapper(mIn));
-			int3n.read(new InputViewDataInputStreamWrapper(mIn));
+			int1n.read(mIn);
+			int2n.read(mIn);
+			int3n.read(mIn);
 			Assert.assertEquals(int0.compareTo(int1n), 0);
 			Assert.assertEquals(int0.getValue(), int1n.getValue());
 			Assert.assertEquals(int2.compareTo(int2n), 0);
@@ -104,15 +98,15 @@ public class PrimitiveDataTypeTest {
 		Assert.assertEquals(double0.compareTo(double3), -1);
 		// test stream output and retrieval
 		try {
-			double0.write(new OutputViewDataOutputStreamWrapper(mOut));
-			double2.write(new OutputViewDataOutputStreamWrapper(mOut));
-			double3.write(new OutputViewDataOutputStreamWrapper(mOut));
+			double0.write(mOut);
+			double2.write(mOut);
+			double3.write(mOut);
 			DoubleValue double1n = new DoubleValue();
 			DoubleValue double2n = new DoubleValue();
 			DoubleValue double3n = new DoubleValue();
-			double1n.read(new InputViewDataInputStreamWrapper(mIn));
-			double2n.read(new InputViewDataInputStreamWrapper(mIn));
-			double3n.read(new InputViewDataInputStreamWrapper(mIn));
+			double1n.read(mIn);
+			double2n.read(mIn);
+			double3n.read(mIn);
 			Assert.assertEquals(double0.compareTo(double1n), 0);
 			Assert.assertEquals(double0.getValue(), double1n.getValue(), 0.0001);
 			Assert.assertEquals(double2.compareTo(double2n), 0);
@@ -164,25 +158,27 @@ public class PrimitiveDataTypeTest {
 		try {
 			string7.charAt(5);
 			Assert.fail("Exception should have been thrown when accessing characters out of bounds.");
-		} catch (IndexOutOfBoundsException iOOBE) {}
+		} catch (IndexOutOfBoundsException iOOBE) {
+			// expected
+		}
 		
 		// test stream out/input
 		try {
-			string0.write(new OutputViewDataOutputStreamWrapper(mOut));
-			string4.write(new OutputViewDataOutputStreamWrapper(mOut));
-			string2.write(new OutputViewDataOutputStreamWrapper(mOut));
-			string3.write(new OutputViewDataOutputStreamWrapper(mOut));
-			string7.write(new OutputViewDataOutputStreamWrapper(mOut));
+			string0.write(mOut);
+			string4.write(mOut);
+			string2.write(mOut);
+			string3.write(mOut);
+			string7.write(mOut);
 			StringValue string1n = new StringValue();
 			StringValue string2n = new StringValue();
 			StringValue string3n = new StringValue();
 			StringValue string4n = new StringValue();
 			StringValue string7n = new StringValue();
-			string1n.read(new InputViewDataInputStreamWrapper(mIn));
-			string4n.read(new InputViewDataInputStreamWrapper(mIn));
-			string2n.read(new InputViewDataInputStreamWrapper(mIn));
-			string3n.read(new InputViewDataInputStreamWrapper(mIn));
-			string7n.read(new InputViewDataInputStreamWrapper(mIn));
+			string1n.read(mIn);
+			string4n.read(mIn);
+			string2n.read(mIn);
+			string3n.read(mIn);
+			string7n.read(mIn);
 			Assert.assertEquals(string0.compareTo(string1n), 0);
 			Assert.assertEquals(string0.toString(), string1n.toString());
 			Assert.assertEquals(string4.compareTo(string4n), 0);
@@ -196,7 +192,10 @@ public class PrimitiveDataTypeTest {
 			try {
 				string7n.charAt(5);
 				Assert.fail("Exception should have been thrown when accessing characters out of bounds.");
-			} catch (IndexOutOfBoundsException iOOBE) {}
+			}
+			catch (IndexOutOfBoundsException iOOBE) {
+				// expected
+			}
 			
 		} catch (Exception e) {
 			Assert.assertTrue(false);
@@ -221,19 +220,19 @@ public class PrimitiveDataTypeTest {
 		try {
 			// write it multiple times
 			for (int i = 0; i < numWrites; i++) {
-				pn.write(new OutputViewDataOutputStreamWrapper(mOut));
+				pn.write(mOut);
 			}
 			
 			// read it multiple times
 			for (int i = 0; i < numWrites; i++) {
-				pn.read(new InputViewDataInputStreamWrapper(mIn));
+				pn.read(mIn);
 			}
 			
-			Assert.assertEquals("Reading PactNull does not consume the same data as was written.", mIn.available(), 0);
+			
+			Assert.assertEquals("Reading PactNull does not consume the same data as was written.", 0, in.available());
 		}
 		catch (IOException ioex) {
 			Assert.fail("An exception occurred in the testcase: " + ioex.getMessage());
 		}
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d9a061c6/flink-core/src/test/java/org/apache/flink/types/RecordITCase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/RecordITCase.java b/flink-core/src/test/java/org/apache/flink/types/RecordITCase.java
index 09e3516..4db755f 100644
--- a/flink-core/src/test/java/org/apache/flink/types/RecordITCase.java
+++ b/flink-core/src/test/java/org/apache/flink/types/RecordITCase.java
@@ -16,16 +16,17 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.types;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
 import java.util.Random;
 
-import org.apache.flink.types.Value;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -35,20 +36,18 @@ public class RecordITCase {
 	private static final long SEED = 354144423270432543L;
 	private final Random rand = new Random(RecordITCase.SEED);
 	
-	private DataInputStream in;
-	private DataOutputStream out;
+	private DataInputView in;
+	private DataOutputView out;
 
 	@Before
-	public void setUp() throws Exception
-	{
+	public void setUp() throws Exception {
 		PipedInputStream pipedInput = new PipedInputStream(32*1024*1024);
-		this.in = new DataInputStream(pipedInput);
-		this.out = new DataOutputStream(new PipedOutputStream(pipedInput));
+		this.in = new DataInputViewStreamWrapper(pipedInput);
+		this.out = new DataOutputViewStreamWrapper(new PipedOutputStream(pipedInput));
 	}
 	
 	@Test
-	public void massiveRandomBlackBoxTests()
-	{
+	public void massiveRandomBlackBoxTests() {
 		try {
 			// random test with records with a small number of fields
 			for (int i = 0; i < 100000; i++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d9a061c6/flink-core/src/test/java/org/apache/flink/types/RecordTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/RecordTest.java b/flink-core/src/test/java/org/apache/flink/types/RecordTest.java
index 9ba45a7..d7e3edd 100644
--- a/flink-core/src/test/java/org/apache/flink/types/RecordTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/RecordTest.java
@@ -16,29 +16,22 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.types;
 
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
 import java.util.Arrays;
 import java.util.Random;
 
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -48,8 +41,8 @@ public class RecordTest {
 	private static final long SEED = 354144423270432543L;
 	private final Random rand = new Random(RecordTest.SEED);
 	
-	private DataInputStream in;
-	private DataOutputStream out;
+	private DataInputView in;
+	private DataOutputView out;
 	
 	// Couple of test values
 	private final StringValue origVal1 = new StringValue("Hello World!");
@@ -59,28 +52,28 @@ public class RecordTest {
 	
 
 	@Before
-	public void setUp() throws Exception
-	{
-		PipedInputStream pipedInput = new PipedInputStream(1024*1024);
-		this.in = new DataInputStream(pipedInput);
-		this.out = new DataOutputStream(new PipedOutputStream(pipedInput));
+	public void setUp() throws Exception {
+		PipedInputStream pipeIn = new PipedInputStream(1024*1024);
+		PipedOutputStream pipeOut = new PipedOutputStream(pipeIn);
+		
+		this.in = new DataInputViewStreamWrapper(pipeIn);
+		this.out = new DataOutputViewStreamWrapper(pipeOut);
 	}
 	
 	@Test
-	public void testEmptyRecordSerialization()
-	{
+	public void testEmptyRecordSerialization() {
 		try {
 			// test deserialize into self
 			Record empty = new Record();
-			empty.write(new OutputViewDataOutputStreamWrapper(this.out));
-			empty.read(new InputViewDataInputStreamWrapper(this.in));
+			empty.write(this.out);
+			empty.read(in);
 			Assert.assertTrue("Deserialized Empty record is not another empty record.", empty.getNumFields() == 0);
 			
 			// test deserialize into new
 			empty = new Record();
-			empty.write(new OutputViewDataOutputStreamWrapper(this.out));
+			empty.write(this.out);
 			empty = new Record();
-			empty.read(new InputViewDataInputStreamWrapper(this.in));
+			empty.read(this.in);
 			Assert.assertTrue("Deserialized Empty record is not another empty record.", empty.getNumFields() == 0);
 			
 		} catch (Throwable t) {
@@ -170,7 +163,7 @@ public class RecordTest {
 //	}
 
 	@Test
-	public void testRemoveField() {		
+	public void testRemoveField() {
 		Record record = null;
 		int oldLen = 0;
 
@@ -271,8 +264,7 @@ public class RecordTest {
 //	}
 
 	@Test
-	public void testSetNullInt()
-	{
+	public void testSetNullInt() {
 		try {
 			Record record = this.generateFilledDenseRecord(58);
 	
@@ -285,8 +277,7 @@ public class RecordTest {
 	}
 
 	@Test
-	public void testSetNullLong()
-	{
+	public void testSetNullLong() {
 		try {
 			Record record = this.generateFilledDenseRecord(58);
 			long mask = generateRandomBitmask(58);
@@ -395,18 +386,18 @@ public class RecordTest {
 	
 			try {
 				// serialize and deserialize to remove all buffered info
-				r.write(new OutputViewDataOutputStreamWrapper(out));
+				r.write(this.out);
 				r = new Record();
-				r.read(new InputViewDataInputStreamWrapper(in));
+				r.read(this.in);
 	
 				r.setField(1, new IntValue(10));
 				r.setField(4, new StringValue("Some long value"));
 				r.setField(5, new StringValue("An even longer value"));
 				r.setField(10, new IntValue(10));
 	
-				r.write(new OutputViewDataOutputStreamWrapper(out));
+				r.write(this.out);
 				r = new Record();
-				r.read(new InputViewDataInputStreamWrapper(in));
+				r.read(this.in);
 	
 				assertTrue(r.getField(0, IntValue.class).getValue() == 0);
 				assertTrue(r.getField(1, IntValue.class).getValue() == 10);
@@ -420,10 +411,8 @@ public class RecordTest {
 				assertTrue(r.getField(9, IntValue.class) == null);
 				assertTrue(r.getField(10, IntValue.class).getValue() == 10);
 	
-			} catch (RuntimeException re) {
+			} catch (RuntimeException | IOException re) {
 				fail("Error updating binary representation: " + re.getMessage());
-			} catch (IOException e) {
-				fail("Error updating binary representation: " + e.getMessage());
 			}
 		} catch (Throwable t) {
 			Assert.fail("Test failed due to an exception: " + t.getMessage());
@@ -440,8 +429,8 @@ public class RecordTest {
 			Record record2 = new Record();
 			try {
 				// De/Serialize the record
-				record1.write(new OutputViewDataOutputStreamWrapper(this.out));
-				record2.read(new InputViewDataInputStreamWrapper(this.in));
+				record1.write(this.out);
+				record2.read(this.in);
 	
 				assertTrue(record1.getNumFields() == record2.getNumFields());
 	
@@ -469,20 +458,20 @@ public class RecordTest {
 		try {
 			Record record = new Record(new IntValue(42));
 	
-			record.write(new OutputViewDataOutputStreamWrapper(out));
+			record.write(this.out);
 			Assert.assertEquals(42, record.getField(0, IntValue.class).getValue());
 	
 			record.setField(0, new IntValue(23));
-			record.write(new OutputViewDataOutputStreamWrapper(out));
+			record.write(this.out);
 			Assert.assertEquals(23, record.getField(0, IntValue.class).getValue());
 	
 			record.clear();
 			Assert.assertEquals(0, record.getNumFields());
 	
 			Record record2 = new Record(new IntValue(42));
-			record2.read(new InputViewDataInputStreamWrapper(in));
+			record2.read(in);
 			Assert.assertEquals(42, record2.getField(0, IntValue.class).getValue());
-			record2.read(new InputViewDataInputStreamWrapper(in));
+			record2.read(in);
 			Assert.assertEquals(23, record2.getField(0, IntValue.class).getValue());
 		} catch (Throwable t) {
 			Assert.fail("Test failed due to an exception: " + t.getMessage());
@@ -533,9 +522,9 @@ public class RecordTest {
 				// two very long fields
 				{new StringValue(createRandomString(this.rand, 1265)), null, new StringValue(createRandomString(this.rand, 855))}
 			};
-			
-			for (int i = 0; i < values.length; i++) {
-				blackboxTestRecordWithValues(values[i], this.rand, this.in, this.out);
+
+			for (Value[] value : values) {
+				blackboxTestRecordWithValues(value, this.rand, this.in, this.out);
 			}
 			
 			// random test with records with a small number of fields
@@ -554,8 +543,8 @@ public class RecordTest {
 		}
 	}
 	
-	static final void blackboxTestRecordWithValues(Value[] values, Random rnd, DataInputStream reader,
-												   DataOutputStream writer)
+	static void blackboxTestRecordWithValues(Value[] values, Random rnd, DataInputView reader,
+												   DataOutputView writer)
 	throws Exception
 	{
 		final int[] permutation1 = createPermutation(rnd, values.length);
@@ -600,9 +589,9 @@ public class RecordTest {
 			final int pos = permutation1[i];
 			rec.setField(pos, values[pos]);
 		}
-		rec.write(new OutputViewDataOutputStreamWrapper(writer));
+		rec.write(writer);
 		rec = new Record();
-		rec.read(new InputViewDataInputStreamWrapper(reader));
+		rec.read(reader);
 		testAllRetrievalMethods(rec, permutation2, values);
 		
 		// test adding and retrieving with full stream serialization and deserialization into the same record
@@ -611,8 +600,8 @@ public class RecordTest {
 			final int pos = permutation1[i];
 			rec.setField(pos, values[pos]);
 		}
-		rec.write(new OutputViewDataOutputStreamWrapper(writer));
-		rec.read(new InputViewDataInputStreamWrapper(reader));
+		rec.write(writer);
+		rec.read(reader);
 		testAllRetrievalMethods(rec, permutation2, values);
 		
 		// test adding and retrieving with partial stream serialization and deserialization into a new record
@@ -620,18 +609,18 @@ public class RecordTest {
 		updatePos = rnd.nextInt(values.length + 1);
 		for (int i = 0; i < values.length; i++) {
 			if (i == updatePos) {
-				rec.write(new OutputViewDataOutputStreamWrapper(writer));
+				rec.write(writer);
 				rec = new Record();
-				rec.read(new InputViewDataInputStreamWrapper(reader));
+				rec.read(reader);
 			}
 			
 			final int pos = permutation1[i];
 			rec.setField(pos, values[pos]);
 		}
 		if (updatePos == values.length) {
-			rec.write(new OutputViewDataOutputStreamWrapper(writer));
+			rec.write(writer);
 			rec = new Record();
-			rec.read(new InputViewDataInputStreamWrapper(reader));
+			rec.read(reader);
 		}
 		testAllRetrievalMethods(rec, permutation2, values);
 		
@@ -640,16 +629,16 @@ public class RecordTest {
 		updatePos = rnd.nextInt(values.length + 1);
 		for (int i = 0; i < values.length; i++) {
 			if (i == updatePos) {
-				rec.write(new OutputViewDataOutputStreamWrapper(writer));
-				rec.read(new InputViewDataInputStreamWrapper(reader));
+				rec.write(writer);
+				rec.read(reader);
 			}
 			
 			final int pos = permutation1[i];
 			rec.setField(pos, values[pos]);
 		}
 		if (updatePos == values.length) {
-			rec.write(new OutputViewDataOutputStreamWrapper(writer));
-			rec.read(new InputViewDataInputStreamWrapper(reader));
+			rec.write(writer);
+			rec.read(reader);
 		}
 		testAllRetrievalMethods(rec, permutation2, values);
 
@@ -658,17 +647,17 @@ public class RecordTest {
 		updatePos = rnd.nextInt(values.length + 1);
 		for (int i = 0; i < values.length; i++) {
 			if (i == updatePos) {
-				rec.write(new OutputViewDataOutputStreamWrapper(writer));
+				rec.write(writer);
 				rec = new Record();
-				rec.read(new InputViewDataInputStreamWrapper(reader));
+				rec.read(reader);
 			}
 			
 			final int pos = permutation1[i];
 			rec.setField(pos, values[pos]);
 		}
-		rec.write(new OutputViewDataOutputStreamWrapper(writer));
+		rec.write(writer);
 		rec = new Record();
-		rec.read(new InputViewDataInputStreamWrapper(reader));
+		rec.read(reader);
 		testAllRetrievalMethods(rec, permutation2, values);
 		
 		// test adding and retrieving with partial stream serialization and deserialization into the same record
@@ -676,21 +665,19 @@ public class RecordTest {
 		updatePos = rnd.nextInt(values.length + 1);
 		for (int i = 0; i < values.length; i++) {
 			if (i == updatePos) {
-				rec.write(new OutputViewDataOutputStreamWrapper(writer));
-				rec.read(new InputViewDataInputStreamWrapper(reader));
+				rec.write(writer);
+				rec.read(reader);
 			}
 			
 			final int pos = permutation1[i];
 			rec.setField(pos, values[pos]);
 		}
-		rec.write(new OutputViewDataOutputStreamWrapper(writer));
-		rec.read(new InputViewDataInputStreamWrapper(reader));
+		rec.write(writer);
+		rec.read(reader);
 		testAllRetrievalMethods(rec, permutation2, values);
 	}
 	
-	public static final void testAllRetrievalMethods(Record rec, int[] permutation, Value[] expected)
-	throws Exception
-	{
+	public static void testAllRetrievalMethods(Record rec, int[] permutation, Value[] expected) throws Exception {
 		// test getField(int, Class)
 		for (int i = 0; i < expected.length; i++) {
 			final int pos = permutation[i];
@@ -750,8 +737,7 @@ public class RecordTest {
 	}
 
 	@Test
-	public void testUnionFields()
-	{
+	public void testUnionFields() {
 		try {
 			final Value[][] values = new Value[][] {
 				{new IntValue(56), null, new IntValue(-7628761)},
@@ -776,8 +762,7 @@ public class RecordTest {
 		}
 	}
 	
-	private void testUnionFieldsForValues(Value[] rec1fields, Value[] rec2fields, Random rnd)
-	{
+	private void testUnionFieldsForValues(Value[] rec1fields, Value[] rec2fields, Random rnd) {
 		// fully in binary sync
 		Record rec1 = createRecord(rec1fields);
 		Record rec2 = createRecord(rec2fields);
@@ -808,7 +793,6 @@ public class RecordTest {
 		
 		// both partially in binary sync
 		rec1 = new Record();
-		rec1 = new Record();
 		
 		int[] permutation1 = createPermutation(rnd, rec1fields.length);
 		int[] permutation2 = createPermutation(rnd, rec2fields.length);
@@ -843,8 +827,7 @@ public class RecordTest {
 		checkUnionedRecord(rec1, rec1fields, rec2fields);
 	}
 	
-	private static final void checkUnionedRecord(Record union, Value[] rec1fields, Value[] rec2fields)
-	{
+	private static void checkUnionedRecord(Record union, Value[] rec1fields, Value[] rec2fields) {
 		for (int i = 0; i < Math.max(rec1fields.length, rec2fields.length); i++) {
 			// determine the expected value from the value arrays
 			final Value expected;
@@ -875,8 +858,7 @@ public class RecordTest {
 	//                                       Utilities
 	// --------------------------------------------------------------------------------------------
 	
-	public static final Record createRecord(Value[] fields)
-	{
+	public static Record createRecord(Value[] fields) {
 		final Record rec = new Record();
 		for (int i = 0; i < fields.length; i++) {
 			rec.setField(i, fields[i]);
@@ -884,8 +866,7 @@ public class RecordTest {
 		return rec;
 	}
 	
-	public static final Value[] createRandomValues(Random rnd, int minNum, int maxNum)
-	{
+	public static Value[] createRandomValues(Random rnd, int minNum, int maxNum) {
 		final int numFields = rnd.nextInt(maxNum - minNum + 1) + minNum;
 		final Value[] values = new Value[numFields];
 		
@@ -917,13 +898,11 @@ public class RecordTest {
 		return values;
 	}
 
-	public static String createRandomString(Random rnd)
-	{
+	public static String createRandomString(Random rnd) {
 		return createRandomString(rnd, rnd.nextInt(150));
 	}
 	
-	public static String createRandomString(Random rnd, int length)
-	{
+	public static String createRandomString(Random rnd, int length) {
 		final StringBuilder sb = new StringBuilder();
 		sb.ensureCapacity(length);
 		
@@ -933,8 +912,7 @@ public class RecordTest {
 		return sb.toString();
 	}
 	
-	public static int[] createPermutation(Random rnd, int length)
-	{
+	public static int[] createPermutation(Random rnd, int length) {
 		final int[] a = new int[length];
 		for (int i = 0; i < length; i++) {
 			a[i] = i;

http://git-wip-us.apache.org/repos/asf/flink/blob/d9a061c6/flink-core/src/test/java/org/apache/flink/types/StringValueSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/StringValueSerializationTest.java b/flink-core/src/test/java/org/apache/flink/types/StringValueSerializationTest.java
index f64ced5..e0ece24 100644
--- a/flink-core/src/test/java/org/apache/flink/types/StringValueSerializationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/StringValueSerializationTest.java
@@ -23,15 +23,13 @@ import static org.junit.Assert.fail;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Random;
 
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.apache.flink.types.StringValue;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.StringUtils;
+
 import org.junit.Test;
 
 /**
@@ -121,9 +119,9 @@ public class StringValueSerializationTest {
 		}
 	}
 	
-	public static final void testSerialization(String[] values) throws IOException {
+	public static void testSerialization(String[] values) throws IOException {
 		ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
-		OutputViewDataOutputStreamWrapper serializer = new OutputViewDataOutputStreamWrapper(new DataOutputStream(baos));
+		DataOutputViewStreamWrapper serializer = new DataOutputViewStreamWrapper(baos);
 		
 		for (String value : values) {
 			StringValue sv = new StringValue(value);
@@ -134,7 +132,7 @@ public class StringValueSerializationTest {
 		baos.close();
 		
 		ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
-		InputViewDataInputStreamWrapper deserializer = new InputViewDataInputStreamWrapper(new DataInputStream(bais));
+		DataInputViewStreamWrapper deserializer = new DataInputViewStreamWrapper(bais);
 		
 		int num = 0;
 		while (bais.available() > 0) {
@@ -148,9 +146,9 @@ public class StringValueSerializationTest {
 		assertEquals("Wrong number of deserialized values", values.length, num);
 	}
 
-	public static final void testCopy(String[] values) throws IOException {
+	public static void testCopy(String[] values) throws IOException {
 		ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
-		OutputViewDataOutputStreamWrapper serializer = new OutputViewDataOutputStreamWrapper(new DataOutputStream(baos));
+		DataOutputViewStreamWrapper serializer = new DataOutputViewStreamWrapper(baos);
 		
 		StringValue sValue = new StringValue();
 		
@@ -163,17 +161,17 @@ public class StringValueSerializationTest {
 		baos.close();
 		
 		ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
-		InputViewDataInputStreamWrapper source = new InputViewDataInputStreamWrapper(new DataInputStream(bais));
+		DataInputViewStreamWrapper source = new DataInputViewStreamWrapper(bais);
 		
 		ByteArrayOutputStream targetOutput = new ByteArrayOutputStream(4096);
-		OutputViewDataOutputStreamWrapper target = new OutputViewDataOutputStreamWrapper(new DataOutputStream(targetOutput));
-		
-		for (int i = 0; i < values.length; i++) {
+		DataOutputViewStreamWrapper target = new DataOutputViewStreamWrapper(targetOutput);
+
+		for (String value : values) {
 			sValue.copy(source, target);
 		}
 		
 		ByteArrayInputStream validateInput = new ByteArrayInputStream(targetOutput.toByteArray());
-		InputViewDataInputStreamWrapper validate = new InputViewDataInputStreamWrapper(new DataInputStream(validateInput));
+		DataInputViewStreamWrapper validate = new DataInputViewStreamWrapper(validateInput);
 		
 		int num = 0;
 		while (validateInput.available() > 0) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d9a061c6/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
index 1df995b..2ac19ba 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.java.io;
 
 import java.io.IOException;
@@ -31,8 +30,8 @@ import org.apache.flink.api.common.io.GenericInputFormat;
 import org.apache.flink.api.common.io.NonParallelInput;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.core.memory.InputViewObjectInputStreamWrapper;
-import org.apache.flink.core.memory.OutputViewObjectOutputStreamWrapper;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 
 /**
  * An input format that returns objects from a collection.
@@ -83,7 +82,7 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
 		out.writeInt(size);
 		
 		if (size > 0) {
-			OutputViewObjectOutputStreamWrapper wrapper = new OutputViewObjectOutputStreamWrapper(out);
+			DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(out);
 			for (T element : dataSet){
 				serializer.serialize(element, wrapper);
 			}
@@ -98,7 +97,7 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
 		
 		if (collectionLength > 0) {
 			try {
-				InputViewObjectInputStreamWrapper wrapper = new InputViewObjectInputStreamWrapper(in);
+				DataInputViewStreamWrapper wrapper = new DataInputViewStreamWrapper(in);
 				for (int i = 0; i < collectionLength; i++){
 					T element = serializer.deserialize(wrapper);
 					list.add(element);

http://git-wip-us.apache.org/repos/asf/flink/blob/d9a061c6/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index b10e5a8..6a4692e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -22,7 +22,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.util.DataInputDeserializer;
 import org.apache.flink.util.StringUtils;
 
 import java.io.BufferedInputStream;
-import java.io.DataInputStream;
 import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
@@ -402,7 +401,7 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 					if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
 						throw new UTFDataFormatException("malformed input around byte " + (count - 1));
 					}
-					chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
+					chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | (char3 & 0x3F));
 					break;
 				default:
 					throw new UTFDataFormatException("malformed input around byte " + count);
@@ -489,7 +488,7 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 		
 		private File spillFile;
 		
-		private InputViewDataInputStreamWrapper spillFileReader;
+		private DataInputViewStreamWrapper spillFileReader;
 
 		private AccumulatorRegistry.Reporter reporter;
 
@@ -590,9 +589,9 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 				}
 				else {
 					spillingChannel.close();
-					
-					DataInputStream inStream = new DataInputStream(new BufferedInputStream(new FileInputStream(spillFile), 2 * 1024 * 1024));
-					this.spillFileReader = new InputViewDataInputStreamWrapper(inStream);
+
+					BufferedInputStream inStream = new BufferedInputStream(new FileInputStream(spillFile), 2 * 1024 * 1024);
+					this.spillFileReader = new DataInputViewStreamWrapper(inStream);
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9a061c6/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java
index e259523..4e1c19e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/event/IterationEventWithAggregators.java
@@ -16,21 +16,18 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.event;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Map;
 
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.InstantiationUtil;
@@ -101,11 +98,13 @@ public abstract class IterationEventWithAggregators extends TaskEvent {
 					throw new RuntimeException("User-defined aggregator class is not a value sublass.");
 				}
 				
-				DataInputStream in = new DataInputStream(new ByteArrayInputStream(serializedData[i]));
-				try {
-					v.read(new InputViewDataInputStreamWrapper(in));
-					in.close();
-				} catch (IOException e) {
+				
+				try (DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(
+					new ByteArrayInputStream(serializedData[i])))
+				{
+					v.read(in);
+				}
+				catch (IOException e) {
 					throw new RuntimeException("Error while deserializing the user-defined aggregate class.", e);
 				}
 				
@@ -122,7 +121,7 @@ public abstract class IterationEventWithAggregators extends TaskEvent {
 		out.writeInt(num);
 		
 		ByteArrayOutputStream boas = new ByteArrayOutputStream();
-		DataOutputStream bufferStream = new DataOutputStream(boas);
+		DataOutputViewStreamWrapper bufferStream = new DataOutputViewStreamWrapper(boas);
 		
 		for (int i = 0; i < num; i++) {
 			// aggregator name and type
@@ -130,7 +129,7 @@ public abstract class IterationEventWithAggregators extends TaskEvent {
 			out.writeUTF(this.aggregates[i].getClass().getName());
 			
 			// aggregator value indirect as a byte array
-			this.aggregates[i].write(new OutputViewDataOutputStreamWrapper(bufferStream));
+			this.aggregates[i].write(bufferStream);
 			bufferStream.flush();
 			byte[] bytes = boas.toByteArray();
 			out.writeInt(bytes.length);

http://git-wip-us.apache.org/repos/asf/flink/blob/d9a061c6/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
index c32c43b..2c45293 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
@@ -16,13 +16,10 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.util;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -44,9 +41,9 @@ import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.Driver;
 import org.apache.flink.runtime.operators.chaining.ChainedDriver;
@@ -58,6 +55,9 @@ import org.apache.flink.util.InstantiationUtil;
  * Configuration class which stores all relevant parameters required to set up the Pact tasks.
  */
 public class TaskConfig implements Serializable {
+
+	private static final long serialVersionUID = -2498884325640066272L;
+	
 	
 	private static final String TASK_NAME = "taskname";
 	
@@ -558,15 +558,16 @@ public class TaskConfig implements Serializable {
 	public void setOutputDataDistribution(DataDistribution distribution, int outputNum) {
 		this.config.setString(OUTPUT_DATA_DISTRIBUTION_CLASS, distribution.getClass().getName());
 		
-		final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		final DataOutputStream dos = new DataOutputStream(baos);
-		try {
-			distribution.write(new OutputViewDataOutputStreamWrapper(dos));
-		} catch (IOException e) {
+		try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+				DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos)) {
+			
+			distribution.write(out);
+			config.setBytes(OUTPUT_DATA_DISTRIBUTION_PREFIX + outputNum, baos.toByteArray());
+			
+		}
+		catch (IOException e) {
 			throw new RuntimeException("Error serializing the DataDistribution: " + e.getMessage(), e);
 		}
-
-		this.config.setBytes(OUTPUT_DATA_DISTRIBUTION_PREFIX + outputNum, baos.toByteArray());
 	}
 	
 	public DataDistribution getOutputDataDistribution(int outputNum, final ClassLoader cl) throws ClassNotFoundException {
@@ -577,7 +578,7 @@ public class TaskConfig implements Serializable {
 		
 		final Class<? extends DataDistribution> clazz;
 		try {
-			clazz = (Class<? extends DataDistribution>) Class.forName(className, true, cl).asSubclass(DataDistribution.class);
+			clazz = Class.forName(className, true, cl).asSubclass(DataDistribution.class);
 		} catch (ClassCastException ccex) {
 			throw new CorruptConfigurationException("The class noted in the configuration as the data distribution " +
 					"is no subclass of DataDistribution.");
@@ -592,14 +593,14 @@ public class TaskConfig implements Serializable {
 		}
 		
 		final ByteArrayInputStream bais = new ByteArrayInputStream(stateEncoded);
-		final DataInputStream in = new DataInputStream(bais);
+		final DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
 		
 		try {
-			distribution.read(new InputViewDataInputStreamWrapper(in));
+			distribution.read(in);
 			return distribution;
 		} catch (Exception ex) {
 			throw new RuntimeException("The deserialization of the encoded data distribution state caused an error"
-				+ ex.getMessage() == null ? "." : ": " + ex.getMessage(), ex);
+				+ (ex.getMessage() == null ? "." : ": " + ex.getMessage()), ex);
 		}
 	}
 	


Mime
View raw message