flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [3/5] [FLINK-994] Replaced DataInput and DataOutput with DataInputView and DataOutputView
Date Tue, 08 Jul 2014 12:18:25 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-core/src/main/java/eu/stratosphere/types/Record.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/types/Record.java b/stratosphere-core/src/main/java/eu/stratosphere/types/Record.java
index 73671fa..78e90f0 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/types/Record.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/types/Record.java
@@ -1070,7 +1070,7 @@ public final class Record implements Value, CopyableValue<Record> {
 	// --------------------------------------------------------------------------------------------
 	
 	@Override
-	public void write(DataOutput out) throws IOException {
+	public void write(DataOutputView out) throws IOException {
 		// make sure everything is in a valid binary representation
 		updateBinaryRepresenation();
 		
@@ -1080,7 +1080,7 @@ public final class Record implements Value, CopyableValue<Record> {
 	}
 
 	@Override
-	public void read(DataInput in) throws IOException {
+	public void read(DataInputView in) throws IOException {
 		final int len = readVarLengthInt(in);
 		this.binaryLen = len;
 			
@@ -1187,7 +1187,7 @@ public final class Record implements Value, CopyableValue<Record> {
 	}
 	
 	/**
-	 * Writes this record to the given output view. This method is similar to {@link #write(DataOutput)}, but
+	 * Writes this record to the given output view. This method is similar to {@link eu.stratosphere.core.io.IOReadableWritable#write(eu.stratosphere.core.memory.DataOutputView)}, but
 	 * it returns the number of bytes written.
 	 * 
 	 * @param target The view to write the record to.
@@ -1262,12 +1262,12 @@ public final class Record implements Value, CopyableValue<Record> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void write(DataOutput out) throws IOException {
+		public void write(DataOutputView out) throws IOException {
 			throw new UnsupportedOperationException();
 		}
 
 		@Override
-		public void read(DataInput in) throws IOException {
+		public void read(DataInputView in) throws IOException {
 			throw new UnsupportedOperationException();
 		}		
 	};
@@ -1275,7 +1275,7 @@ public final class Record implements Value, CopyableValue<Record> {
 	/**
 	 * Internal interface class to provide serialization for the data types.
 	 */
-	private static final class InternalDeSerializer implements DataInput, DataOutput, Serializable {
+	private static final class InternalDeSerializer implements DataInputView, DataOutputView, Serializable {
 		
 		private static final long serialVersionUID = 1L;
 		
@@ -1510,6 +1510,49 @@ public final class Record implements Value, CopyableValue<Record> {
 				return n;
 			}
 		}
+
+		@Override
+		public void skipBytesToRead(int numBytes) throws IOException {
+			if(this.end - this.position < numBytes) {
+				throw new EOFException("Could not skip " + numBytes + ".");
+			}
+			skipBytes(numBytes);
+		}
+
+		@Override
+		public int read(byte[] b, int off, int len) throws IOException {
+			if(b == null){
+				throw new NullPointerException("Byte array b cannot be null.");
+			}
+
+			if(off < 0){
+				throw new IndexOutOfBoundsException("Offset cannot be negative.");
+			}
+
+			if(len < 0){
+				throw new IndexOutOfBoundsException("Length cannot be negative.");
+			}
+
+			if(b.length - off < len){
+				throw new IndexOutOfBoundsException("Byte array does not provide enough space to store requested data" +
+						".");
+			}
+
+			if(this.position >= this.end){
+				return -1;
+			}else{
+				int toRead = Math.min(this.end-this.position, len);
+				System.arraycopy(this.memory,this.position,b,off,toRead);
+				this.position += toRead;
+
+				return toRead;
+			}
+		}
+
+		@Override
+		public int read(byte[] b) throws IOException {
+			return read(b, 0, b.length);
+		}
 		
 		// ----------------------------------------------------------------------------------------
 		//                               Data Output
@@ -1736,5 +1779,24 @@ public final class Record implements Value, CopyableValue<Record> {
 		private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
 		
 		private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
+
+		@Override
+		public void skipBytesToWrite(int numBytes) throws IOException {
+			int skippedBytes = skipBytes(numBytes);
+
+			if(skippedBytes != numBytes){
+				throw new EOFException("Could not skip " + numBytes + " bytes.");
+			}
+		}
+
+		@Override
+		public void write(DataInputView source, int numBytes) throws IOException {
+			if(numBytes > this.end - this.position){
+				throw new IOException("Could not write " + numBytes + " bytes since the buffer is full.");
+			}
+
+			source.read(this.memory,this.position, numBytes);
+			this.position += numBytes;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-core/src/main/java/eu/stratosphere/types/ShortValue.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/types/ShortValue.java b/stratosphere-core/src/main/java/eu/stratosphere/types/ShortValue.java
index 52d6fd2..d242986 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/types/ShortValue.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/types/ShortValue.java
@@ -13,8 +13,6 @@
 
 package eu.stratosphere.types;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
 import eu.stratosphere.core.memory.DataInputView;
@@ -80,12 +78,12 @@ public class ShortValue implements NormalizableKey<ShortValue>, ResettableValue<
 	// --------------------------------------------------------------------------------------------
 	
 	@Override
-	public void read(DataInput in) throws IOException {
+	public void read(DataInputView in) throws IOException {
 		this.value = in.readShort();
 	}
 
 	@Override
-	public void write(DataOutput out) throws IOException {
+	public void write(DataOutputView out) throws IOException {
 		out.writeShort(this.value);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-core/src/main/java/eu/stratosphere/types/StringValue.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/main/java/eu/stratosphere/types/StringValue.java b/stratosphere-core/src/main/java/eu/stratosphere/types/StringValue.java
index cc970e8..1a133f6 100644
--- a/stratosphere-core/src/main/java/eu/stratosphere/types/StringValue.java
+++ b/stratosphere-core/src/main/java/eu/stratosphere/types/StringValue.java
@@ -470,7 +470,7 @@ public class StringValue implements NormalizableKey<StringValue>, CharSequence,
 	// --------------------------------------------------------------------------------------------
 	
 	@Override
-	public void read(final DataInput in) throws IOException {
+	public void read(final DataInputView in) throws IOException {
 		int len = in.readUnsignedByte();
 
 		if (len >= HIGH_BIT) {
@@ -508,7 +508,7 @@ public class StringValue implements NormalizableKey<StringValue>, CharSequence,
 	}
 
 	@Override
-	public void write(final DataOutput out) throws IOException {
+	public void write(final DataOutputView out) throws IOException {
 		int len = this.len;
 
 		// write the length, variable-length encoded

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-core/src/test/java/eu/stratosphere/api/common/io/BinaryInputFormatTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/test/java/eu/stratosphere/api/common/io/BinaryInputFormatTest.java b/stratosphere-core/src/test/java/eu/stratosphere/api/common/io/BinaryInputFormatTest.java
index 7d10a9f..e820013 100644
--- a/stratosphere-core/src/test/java/eu/stratosphere/api/common/io/BinaryInputFormatTest.java
+++ b/stratosphere-core/src/test/java/eu/stratosphere/api/common/io/BinaryInputFormatTest.java
@@ -13,11 +13,11 @@
 
 package eu.stratosphere.api.common.io;
 
-import java.io.DataInput;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 
+import eu.stratosphere.core.memory.DataInputView;
 import org.apache.log4j.Level;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -35,7 +35,7 @@ public class BinaryInputFormatTest {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		protected Record deserialize(Record record, DataInput dataInput) {
+		protected Record deserialize(Record record, DataInputView dataInput) {
 			return record;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-core/src/test/java/eu/stratosphere/api/common/io/SequentialFormatTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/test/java/eu/stratosphere/api/common/io/SequentialFormatTest.java b/stratosphere-core/src/test/java/eu/stratosphere/api/common/io/SequentialFormatTest.java
index 420fdf6..164899c 100644
--- a/stratosphere-core/src/test/java/eu/stratosphere/api/common/io/SequentialFormatTest.java
+++ b/stratosphere-core/src/test/java/eu/stratosphere/api/common/io/SequentialFormatTest.java
@@ -21,6 +21,7 @@ import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 
+import eu.stratosphere.core.memory.OutputViewDataOutputStreamWrapper;
 import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.Assert;
@@ -92,7 +93,7 @@ public class SequentialFormatTest {
 			ByteCounter byteCounter = new ByteCounter();
 			DataOutputStream out = new DataOutputStream(byteCounter);
 			for (int fileCount = 0; fileCount < this.getNumberOfTuplesPerFile(fileIndex); fileCount++, recordIndex++) {
-				this.getRecord(recordIndex).write(out);
+				this.getRecord(recordIndex).write(new OutputViewDataOutputStreamWrapper(out));
 			}
 			this.rawDataSizes[fileIndex] = byteCounter.getLength();
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-core/src/test/java/eu/stratosphere/api/distributions/SimpleDataDistributionTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/test/java/eu/stratosphere/api/distributions/SimpleDataDistributionTest.java b/stratosphere-core/src/test/java/eu/stratosphere/api/distributions/SimpleDataDistributionTest.java
index c87d2e1..eb83f8c 100644
--- a/stratosphere-core/src/test/java/eu/stratosphere/api/distributions/SimpleDataDistributionTest.java
+++ b/stratosphere-core/src/test/java/eu/stratosphere/api/distributions/SimpleDataDistributionTest.java
@@ -19,6 +19,8 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 
+import eu.stratosphere.core.memory.InputViewDataInputStreamWrapper;
+import eu.stratosphere.core.memory.OutputViewDataOutputStreamWrapper;
 import junit.framework.Assert;
 
 import org.junit.Test;
@@ -107,7 +109,7 @@ public class SimpleDataDistributionTest {
 		final ByteArrayOutputStream baos = new ByteArrayOutputStream();
 		final DataOutputStream dos = new DataOutputStream(baos);
 		try {
-			ddWrite.write(dos);
+			ddWrite.write(new OutputViewDataOutputStreamWrapper(dos));
 		} catch (IOException e) {
 			Assert.fail("Error serializing the DataDistribution: " + e.getMessage());
 		}
@@ -120,7 +122,7 @@ public class SimpleDataDistributionTest {
 		SimpleDistribution ddRead = new SimpleDistribution();
 		
 		try {
-			ddRead.read(in);
+			ddRead.read(new InputViewDataInputStreamWrapper(in));
 		} catch (Exception ex) {
 			Assert.fail("The deserialization of the encoded data distribution caused an error");
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-core/src/test/java/eu/stratosphere/core/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/test/java/eu/stratosphere/core/testutils/CommonTestUtils.java b/stratosphere-core/src/test/java/eu/stratosphere/core/testutils/CommonTestUtils.java
index dcfdb80..fcdddcb 100644
--- a/stratosphere-core/src/test/java/eu/stratosphere/core/testutils/CommonTestUtils.java
+++ b/stratosphere-core/src/test/java/eu/stratosphere/core/testutils/CommonTestUtils.java
@@ -25,6 +25,8 @@ import java.io.IOException;
 import eu.stratosphere.configuration.ConfigConstants;
 import eu.stratosphere.configuration.GlobalConfiguration;
 import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.core.memory.InputViewDataInputStreamWrapper;
+import eu.stratosphere.core.memory.OutputViewDataOutputStreamWrapper;
 
 /**
  * This class contains auxiliary methods for unit tests in the Nephele common module.
@@ -95,7 +97,7 @@ public class CommonTestUtils {
 		final ByteArrayOutputStream baos = new ByteArrayOutputStream();
 		final DataOutputStream dos = new DataOutputStream(baos);
 
-		original.write(dos);
+		original.write(new OutputViewDataOutputStreamWrapper(dos));
 
 		final String className = original.getClass().getName();
 		if (className == null) {
@@ -130,7 +132,7 @@ public class CommonTestUtils {
 		final ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
 		final DataInputStream dis = new DataInputStream(bais);
 
-		copy.read(dis);
+		copy.read(new InputViewDataInputStreamWrapper(dis));
 
 		return copy;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-core/src/test/java/eu/stratosphere/types/CollectionsDataTypeTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/test/java/eu/stratosphere/types/CollectionsDataTypeTest.java b/stratosphere-core/src/test/java/eu/stratosphere/types/CollectionsDataTypeTest.java
index 9e81d74..163473e 100644
--- a/stratosphere-core/src/test/java/eu/stratosphere/types/CollectionsDataTypeTest.java
+++ b/stratosphere-core/src/test/java/eu/stratosphere/types/CollectionsDataTypeTest.java
@@ -22,6 +22,8 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Map.Entry;
 
+import eu.stratosphere.core.memory.InputViewDataInputStreamWrapper;
+import eu.stratosphere.core.memory.OutputViewDataOutputStreamWrapper;
 import junit.framework.Assert;
 
 import org.junit.Before;
@@ -58,8 +60,8 @@ public class CollectionsDataTypeTest {
 		try {
 			NfIntStringPair mPairActual = new NfIntStringPair();
 
-			pair1.write(out);
-			mPairActual.read(in);
+			pair1.write(new OutputViewDataOutputStreamWrapper(out));
+			mPairActual.read(new InputViewDataInputStreamWrapper(in));
 
 			Assert.assertEquals(pair1, mPairActual);
 		} catch (IOException e) {
@@ -182,8 +184,8 @@ public class CollectionsDataTypeTest {
 		// now test data transfer
 		NfIntStringMap nMap = new NfIntStringMap();
 		try {
-			map0.write(out);
-			nMap.read(in);
+			map0.write(new OutputViewDataOutputStreamWrapper(out));
+			nMap.read(new InputViewDataInputStreamWrapper(in));
 		} catch (Exception e) {
 			Assert.assertTrue(false);
 		}
@@ -210,8 +212,8 @@ public class CollectionsDataTypeTest {
 		// test data transfer
 		NfStringList mList2 = new NfStringList();
 		try {
-			list.write(out);
-			mList2.read(in);
+			list.write(new OutputViewDataOutputStreamWrapper(out));
+			mList2.read(new InputViewDataInputStreamWrapper(in));
 		} catch (Exception e) {
 			Assert.assertTrue(false);
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-core/src/test/java/eu/stratosphere/types/PrimitiveDataTypeTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/test/java/eu/stratosphere/types/PrimitiveDataTypeTest.java b/stratosphere-core/src/test/java/eu/stratosphere/types/PrimitiveDataTypeTest.java
index 7e893ed..723ea18 100644
--- a/stratosphere-core/src/test/java/eu/stratosphere/types/PrimitiveDataTypeTest.java
+++ b/stratosphere-core/src/test/java/eu/stratosphere/types/PrimitiveDataTypeTest.java
@@ -19,6 +19,8 @@ import java.io.IOException;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
 
+import eu.stratosphere.core.memory.InputViewDataInputStreamWrapper;
+import eu.stratosphere.core.memory.OutputViewDataOutputStreamWrapper;
 import junit.framework.Assert;
 
 import org.junit.Before;
@@ -56,15 +58,15 @@ public class PrimitiveDataTypeTest {
 		Assert.assertEquals(int0.compareTo(int3), -1);
 		// test stream output and retrieval
 		try {
-			int0.write(mOut);
-			int2.write(mOut);
-			int3.write(mOut);
+			int0.write(new OutputViewDataOutputStreamWrapper(mOut));
+			int2.write(new OutputViewDataOutputStreamWrapper(mOut));
+			int3.write(new OutputViewDataOutputStreamWrapper(mOut));
 			IntValue int1n = new IntValue();
 			IntValue int2n = new IntValue();
 			IntValue int3n = new IntValue();
-			int1n.read(mIn);
-			int2n.read(mIn);
-			int3n.read(mIn);
+			int1n.read(new InputViewDataInputStreamWrapper(mIn));
+			int2n.read(new InputViewDataInputStreamWrapper(mIn));
+			int3n.read(new InputViewDataInputStreamWrapper(mIn));
 			Assert.assertEquals(int0.compareTo(int1n), 0);
 			Assert.assertEquals(int0.getValue(), int1n.getValue());
 			Assert.assertEquals(int2.compareTo(int2n), 0);
@@ -92,15 +94,15 @@ public class PrimitiveDataTypeTest {
 		Assert.assertEquals(double0.compareTo(double3), -1);
 		// test stream output and retrieval
 		try {
-			double0.write(mOut);
-			double2.write(mOut);
-			double3.write(mOut);
+			double0.write(new OutputViewDataOutputStreamWrapper(mOut));
+			double2.write(new OutputViewDataOutputStreamWrapper(mOut));
+			double3.write(new OutputViewDataOutputStreamWrapper(mOut));
 			DoubleValue double1n = new DoubleValue();
 			DoubleValue double2n = new DoubleValue();
 			DoubleValue double3n = new DoubleValue();
-			double1n.read(mIn);
-			double2n.read(mIn);
-			double3n.read(mIn);
+			double1n.read(new InputViewDataInputStreamWrapper(mIn));
+			double2n.read(new InputViewDataInputStreamWrapper(mIn));
+			double3n.read(new InputViewDataInputStreamWrapper(mIn));
 			Assert.assertEquals(double0.compareTo(double1n), 0);
 			Assert.assertEquals(double0.getValue(), double1n.getValue());
 			Assert.assertEquals(double2.compareTo(double2n), 0);
@@ -156,21 +158,21 @@ public class PrimitiveDataTypeTest {
 		
 		// test stream out/input
 		try {
-			string0.write(mOut);
-			string4.write(mOut);
-			string2.write(mOut);
-			string3.write(mOut);
-			string7.write(mOut);
+			string0.write(new OutputViewDataOutputStreamWrapper(mOut));
+			string4.write(new OutputViewDataOutputStreamWrapper(mOut));
+			string2.write(new OutputViewDataOutputStreamWrapper(mOut));
+			string3.write(new OutputViewDataOutputStreamWrapper(mOut));
+			string7.write(new OutputViewDataOutputStreamWrapper(mOut));
 			StringValue string1n = new StringValue();
 			StringValue string2n = new StringValue();
 			StringValue string3n = new StringValue();
 			StringValue string4n = new StringValue();
 			StringValue string7n = new StringValue();
-			string1n.read(mIn);
-			string4n.read(mIn);
-			string2n.read(mIn);
-			string3n.read(mIn);
-			string7n.read(mIn);
+			string1n.read(new InputViewDataInputStreamWrapper(mIn));
+			string4n.read(new InputViewDataInputStreamWrapper(mIn));
+			string2n.read(new InputViewDataInputStreamWrapper(mIn));
+			string3n.read(new InputViewDataInputStreamWrapper(mIn));
+			string7n.read(new InputViewDataInputStreamWrapper(mIn));
 			Assert.assertEquals(string0.compareTo(string1n), 0);
 			Assert.assertEquals(string0.toString(), string1n.toString());
 			Assert.assertEquals(string4.compareTo(string4n), 0);
@@ -209,12 +211,12 @@ public class PrimitiveDataTypeTest {
 		try {
 			// write it multiple times
 			for (int i = 0; i < numWrites; i++) {
-				pn.write(mOut);
+				pn.write(new OutputViewDataOutputStreamWrapper(mOut));
 			}
 			
 			// read it multiple times
 			for (int i = 0; i < numWrites; i++) {
-				pn.read(mIn);
+				pn.read(new InputViewDataInputStreamWrapper(mIn));
 			}
 			
 			Assert.assertEquals("Reading PactNull does not consume the same data as was written.", mIn.available(), 0);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-core/src/test/java/eu/stratosphere/types/RecordTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-core/src/test/java/eu/stratosphere/types/RecordTest.java b/stratosphere-core/src/test/java/eu/stratosphere/types/RecordTest.java
index 2f14257..89b3ce6 100644
--- a/stratosphere-core/src/test/java/eu/stratosphere/types/RecordTest.java
+++ b/stratosphere-core/src/test/java/eu/stratosphere/types/RecordTest.java
@@ -26,6 +26,8 @@ import java.io.PipedOutputStream;
 import java.util.Arrays;
 import java.util.Random;
 
+import eu.stratosphere.core.memory.InputViewDataInputStreamWrapper;
+import eu.stratosphere.core.memory.OutputViewDataOutputStreamWrapper;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -59,15 +61,15 @@ public class RecordTest {
 		try {
 			// test deserialize into self
 			Record empty = new Record();
-			empty.write(this.out);
-			empty.read(this.in);
+			empty.write(new OutputViewDataOutputStreamWrapper(this.out));
+			empty.read(new InputViewDataInputStreamWrapper(this.in));
 			Assert.assertTrue("Deserialized Empty record is not another empty record.", empty.getNumFields() == 0);
 			
 			// test deserialize into new
 			empty = new Record();
-			empty.write(this.out);
+			empty.write(new OutputViewDataOutputStreamWrapper(this.out));
 			empty = new Record();
-			empty.read(this.in);
+			empty.read(new InputViewDataInputStreamWrapper(this.in));
 			Assert.assertTrue("Deserialized Empty record is not another empty record.", empty.getNumFields() == 0);
 			
 		} catch (Throwable t) {
@@ -382,18 +384,18 @@ public class RecordTest {
 	
 			try {
 				// serialize and deserialize to remove all buffered info
-				r.write(out);
+				r.write(new OutputViewDataOutputStreamWrapper(out));
 				r = new Record();
-				r.read(in);
+				r.read(new InputViewDataInputStreamWrapper(in));
 	
 				r.setField(1, new IntValue(10));
 				r.setField(4, new StringValue("Some long value"));
 				r.setField(5, new StringValue("An even longer value"));
 				r.setField(10, new IntValue(10));
 	
-				r.write(out);
+				r.write(new OutputViewDataOutputStreamWrapper(out));
 				r = new Record();
-				r.read(in);
+				r.read(new InputViewDataInputStreamWrapper(in));
 	
 				assertTrue(r.getField(0, IntValue.class).getValue() == 0);
 				assertTrue(r.getField(1, IntValue.class).getValue() == 10);
@@ -427,8 +429,8 @@ public class RecordTest {
 			Record record2 = new Record();
 			try {
 				// De/Serialize the record
-				record1.write(this.out);
-				record2.read(this.in);
+				record1.write(new OutputViewDataOutputStreamWrapper(this.out));
+				record2.read(new InputViewDataInputStreamWrapper(this.in));
 	
 				assertTrue(record1.getNumFields() == record2.getNumFields());
 	
@@ -456,20 +458,20 @@ public class RecordTest {
 		try {
 			Record record = new Record(new IntValue(42));
 	
-			record.write(out);
+			record.write(new OutputViewDataOutputStreamWrapper(out));
 			Assert.assertEquals(42, record.getField(0, IntValue.class).getValue());
 	
 			record.setField(0, new IntValue(23));
-			record.write(out);
+			record.write(new OutputViewDataOutputStreamWrapper(out));
 			Assert.assertEquals(23, record.getField(0, IntValue.class).getValue());
 	
 			record.clear();
 			Assert.assertEquals(0, record.getNumFields());
 	
 			Record record2 = new Record(new IntValue(42));
-			record2.read(in);
+			record2.read(new InputViewDataInputStreamWrapper(in));
 			Assert.assertEquals(42, record2.getField(0, IntValue.class).getValue());
-			record2.read(in);
+			record2.read(new InputViewDataInputStreamWrapper(in));
 			Assert.assertEquals(23, record2.getField(0, IntValue.class).getValue());
 		} catch (Throwable t) {
 			Assert.fail("Test failed due to an exception: " + t.getMessage());
@@ -541,7 +543,8 @@ public class RecordTest {
 		}
 	}
 	
-	static final void blackboxTestRecordWithValues(Value[] values, Random rnd, DataInput reader, DataOutput writer)
+	static final void blackboxTestRecordWithValues(Value[] values, Random rnd, DataInputStream reader,
+												   DataOutputStream writer)
 	throws Exception
 	{
 		final int[] permutation1 = createPermutation(rnd, values.length);
@@ -586,9 +589,9 @@ public class RecordTest {
 			final int pos = permutation1[i];
 			rec.setField(pos, values[pos]);
 		}
-		rec.write(writer);
+		rec.write(new OutputViewDataOutputStreamWrapper(writer));
 		rec = new Record();
-		rec.read(reader);
+		rec.read(new InputViewDataInputStreamWrapper(reader));
 		testAllRetrievalMethods(rec, permutation2, values);
 		
 		// test adding and retrieving with full stream serialization and deserialization into the same record
@@ -597,8 +600,8 @@ public class RecordTest {
 			final int pos = permutation1[i];
 			rec.setField(pos, values[pos]);
 		}
-		rec.write(writer);
-		rec.read(reader);
+		rec.write(new OutputViewDataOutputStreamWrapper(writer));
+		rec.read(new InputViewDataInputStreamWrapper(reader));
 		testAllRetrievalMethods(rec, permutation2, values);
 		
 		// test adding and retrieving with partial stream serialization and deserialization into a new record
@@ -606,18 +609,18 @@ public class RecordTest {
 		updatePos = rnd.nextInt(values.length + 1);
 		for (int i = 0; i < values.length; i++) {
 			if (i == updatePos) {
-				rec.write(writer);
+				rec.write(new OutputViewDataOutputStreamWrapper(writer));
 				rec = new Record();
-				rec.read(reader);
+				rec.read(new InputViewDataInputStreamWrapper(reader));
 			}
 			
 			final int pos = permutation1[i];
 			rec.setField(pos, values[pos]);
 		}
 		if (updatePos == values.length) {
-			rec.write(writer);
+			rec.write(new OutputViewDataOutputStreamWrapper(writer));
 			rec = new Record();
-			rec.read(reader);
+			rec.read(new InputViewDataInputStreamWrapper(reader));
 		}
 		testAllRetrievalMethods(rec, permutation2, values);
 		
@@ -626,16 +629,16 @@ public class RecordTest {
 		updatePos = rnd.nextInt(values.length + 1);
 		for (int i = 0; i < values.length; i++) {
 			if (i == updatePos) {
-				rec.write(writer);
-				rec.read(reader);
+				rec.write(new OutputViewDataOutputStreamWrapper(writer));
+				rec.read(new InputViewDataInputStreamWrapper(reader));
 			}
 			
 			final int pos = permutation1[i];
 			rec.setField(pos, values[pos]);
 		}
 		if (updatePos == values.length) {
-			rec.write(writer);
-			rec.read(reader);
+			rec.write(new OutputViewDataOutputStreamWrapper(writer));
+			rec.read(new InputViewDataInputStreamWrapper(reader));
 		}
 		testAllRetrievalMethods(rec, permutation2, values);
 
@@ -644,17 +647,17 @@ public class RecordTest {
 		updatePos = rnd.nextInt(values.length + 1);
 		for (int i = 0; i < values.length; i++) {
 			if (i == updatePos) {
-				rec.write(writer);
+				rec.write(new OutputViewDataOutputStreamWrapper(writer));
 				rec = new Record();
-				rec.read(reader);
+				rec.read(new InputViewDataInputStreamWrapper(reader));
 			}
 			
 			final int pos = permutation1[i];
 			rec.setField(pos, values[pos]);
 		}
-		rec.write(writer);
+		rec.write(new OutputViewDataOutputStreamWrapper(writer));
 		rec = new Record();
-		rec.read(reader);
+		rec.read(new InputViewDataInputStreamWrapper(reader));
 		testAllRetrievalMethods(rec, permutation2, values);
 		
 		// test adding and retrieving with partial stream serialization and deserialization into the same record
@@ -662,15 +665,15 @@ public class RecordTest {
 		updatePos = rnd.nextInt(values.length + 1);
 		for (int i = 0; i < values.length; i++) {
 			if (i == updatePos) {
-				rec.write(writer);
-				rec.read(reader);
+				rec.write(new OutputViewDataOutputStreamWrapper(writer));
+				rec.read(new InputViewDataInputStreamWrapper(reader));
 			}
 			
 			final int pos = permutation1[i];
 			rec.setField(pos, values[pos]);
 		}
-		rec.write(writer);
-		rec.read(reader);
+		rec.write(new OutputViewDataOutputStreamWrapper(writer));
+		rec.read(new InputViewDataInputStreamWrapper(reader));
 		testAllRetrievalMethods(rec, permutation2, values);
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java
index 82f2755..db75ecd 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/io/CollectionInputFormat.java
@@ -27,8 +27,8 @@ import eu.stratosphere.api.common.io.GenericInputFormat;
 import eu.stratosphere.api.common.io.NonParallelInput;
 import eu.stratosphere.api.common.typeutils.TypeSerializer;
 import eu.stratosphere.core.io.GenericInputSplit;
-import eu.stratosphere.core.memory.InputViewDataInputWrapper;
-import eu.stratosphere.core.memory.OutputViewDataOutputWrapper;
+import eu.stratosphere.core.memory.InputViewObjectInputStreamWrapper;
+import eu.stratosphere.core.memory.OutputViewObjectOutputStreamWrapper;
 
 /**
  * An input format that returns objects from a collection.
@@ -78,11 +78,8 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
 		out.defaultWriteObject();
 		out.writeInt(dataSet.size());
 		
-		OutputViewDataOutputWrapper outWrapper = new OutputViewDataOutputWrapper();
-		outWrapper.setDelegate(out);
-		
 		for (T element : dataSet){
-			serializer.serialize(element, outWrapper);
+			serializer.serialize(element, new OutputViewObjectOutputStreamWrapper(out));
 		}
 	}
 
@@ -92,12 +89,10 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
 		int collectionLength = in.readInt();
 		List<T> list = new ArrayList<T>(collectionLength);
 		
-		InputViewDataInputWrapper inWrapper = new InputViewDataInputWrapper();
-		inWrapper.setDelegate(in);
 
 		for (int i = 0; i < collectionLength; i++){
 			T element = serializer.createInstance();
-			element = serializer.deserialize(element, inWrapper);
+			element = serializer.deserialize(element, new InputViewObjectInputStreamWrapper(in));
 			list.add(element);
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/io/ExternalProcessInputSplit.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/io/ExternalProcessInputSplit.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/io/ExternalProcessInputSplit.java
index f07ddd5..e6677a4 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/io/ExternalProcessInputSplit.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/io/ExternalProcessInputSplit.java
@@ -13,12 +13,12 @@
 
 package eu.stratosphere.api.java.record.io;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
 import eu.stratosphere.core.io.GenericInputSplit;
 import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
 
 /**
  * The ExternalProcessInputSplit contains all informations for {@link InputFormat} that read their data from external processes.
@@ -58,13 +58,13 @@ public class ExternalProcessInputSplit extends GenericInputSplit {
 	
 	
 	@Override
-	public void read(DataInput in) throws IOException {
+	public void read(DataInputView in) throws IOException {
 		super.read(in);
 		this.extProcessCommand = StringRecord.readString(in);
 	}
 
 	@Override
-	public void write(DataOutput out) throws IOException {
+	public void write(DataOutputView out) throws IOException {
 		super.write(out);
 		StringRecord.writeString(out, this.extProcessCommand);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/AbstractID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/AbstractID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/AbstractID.java
index 648c8dc..fa9b740 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/AbstractID.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/AbstractID.java
@@ -13,12 +13,12 @@
 
 package eu.stratosphere.nephele;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
 import eu.stratosphere.util.StringUtils;
 import io.netty.buffer.ByteBuf;
 
@@ -152,13 +152,13 @@ public class AbstractID implements IOReadableWritable {
 	}
 
 	@Override
-	public void read(DataInput in) throws IOException {
+	public void read(DataInputView in) throws IOException {
 		this.lowerPart = in.readLong();
 		this.upperPart = in.readLong();
 	}
 
 	@Override
-	public void write(DataOutput out) throws IOException {
+	public void write(DataOutputView out) throws IOException {
 		out.writeLong(this.lowerPart);
 		out.writeLong(this.upperPart);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/client/AbstractJobResult.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/client/AbstractJobResult.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/client/AbstractJobResult.java
index 13b337d..931f1a2 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/client/AbstractJobResult.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/client/AbstractJobResult.java
@@ -13,12 +13,12 @@
 
 package eu.stratosphere.nephele.client;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
 import eu.stratosphere.core.io.IOReadableWritable;
 import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
 import eu.stratosphere.nephele.util.EnumUtils;
 
 /**
@@ -78,7 +78,7 @@ public abstract class AbstractJobResult implements IOReadableWritable {
 
 
 	@Override
-	public void read(final DataInput in) throws IOException {
+	public void read(final DataInputView in) throws IOException {
 
 		// Read the return code
 		this.returnCode = EnumUtils.readEnum(in, ReturnCode.class);
@@ -89,7 +89,7 @@ public abstract class AbstractJobResult implements IOReadableWritable {
 
 
 	@Override
-	public void write(final DataOutput out) throws IOException {
+	public void write(final DataOutputView out) throws IOException {
 
 		// Write the return code
 		EnumUtils.writeEnum(out, this.returnCode);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/client/JobProgressResult.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/client/JobProgressResult.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/client/JobProgressResult.java
index 5f4e34b..d6d05ba 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/client/JobProgressResult.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/client/JobProgressResult.java
@@ -13,11 +13,11 @@
 
 package eu.stratosphere.nephele.client;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Iterator;
 
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
 import eu.stratosphere.nephele.event.job.AbstractEvent;
 import eu.stratosphere.nephele.util.SerializableArrayList;
 
@@ -62,7 +62,7 @@ public class JobProgressResult extends AbstractJobResult {
 
 
 	@Override
-	public void read(final DataInput in) throws IOException {
+	public void read(final DataInputView in) throws IOException {
 		super.read(in);
 
 		this.events.read(in);
@@ -70,7 +70,7 @@ public class JobProgressResult extends AbstractJobResult {
 
 
 	@Override
-	public void write(final DataOutput out) throws IOException {
+	public void write(final DataOutputView out) throws IOException {
 		super.write(out);
 
 		this.events.write(out);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/client/JobSubmissionResult.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/client/JobSubmissionResult.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/client/JobSubmissionResult.java
index 9913be5..9443b0b 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/client/JobSubmissionResult.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/client/JobSubmissionResult.java
@@ -13,8 +13,9 @@
 
 package eu.stratosphere.nephele.client;
 
-import java.io.DataInput;
-import java.io.DataOutput;
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
+
 import java.io.IOException;
 
 /**
@@ -49,13 +50,13 @@ public class JobSubmissionResult extends AbstractJobResult {
 
 
 	@Override
-	public void read(final DataInput in) throws IOException {
+	public void read(final DataInputView in) throws IOException {
 		super.read(in);
 	}
 
 
 	@Override
-	public void write(final DataOutput out) throws IOException {
+	public void write(final DataOutputView out) throws IOException {
 		super.write(out);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/ChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/ChannelDeploymentDescriptor.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/ChannelDeploymentDescriptor.java
index 4068e5b..daacd13 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/ChannelDeploymentDescriptor.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/ChannelDeploymentDescriptor.java
@@ -13,11 +13,11 @@
 
 package eu.stratosphere.nephele.deployment;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
 import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
 import eu.stratosphere.runtime.io.channels.ChannelID;
 
 /**
@@ -72,7 +72,7 @@ public final class ChannelDeploymentDescriptor implements IOReadableWritable {
 
 
 	@Override
-	public void write(final DataOutput out) throws IOException {
+	public void write(final DataOutputView out) throws IOException {
 
 		this.outputChannelID.write(out);
 		this.inputChannelID.write(out);
@@ -80,7 +80,7 @@ public final class ChannelDeploymentDescriptor implements IOReadableWritable {
 
 
 	@Override
-	public void read(final DataInput in) throws IOException {
+	public void read(final DataInputView in) throws IOException {
 
 		this.outputChannelID.read(in);
 		this.inputChannelID.read(in);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/GateDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/GateDeploymentDescriptor.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/GateDeploymentDescriptor.java
index 02d6578..7ad26dd 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/GateDeploymentDescriptor.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/GateDeploymentDescriptor.java
@@ -13,14 +13,14 @@
 
 package eu.stratosphere.nephele.deployment;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
 import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
 import eu.stratosphere.runtime.io.gates.GateID;
 import eu.stratosphere.runtime.io.channels.ChannelType;
 import eu.stratosphere.nephele.util.EnumUtils;
@@ -93,7 +93,7 @@ public final class GateDeploymentDescriptor implements IOReadableWritable {
 
 
 	@Override
-	public void write(final DataOutput out) throws IOException {
+	public void write(final DataOutputView out) throws IOException {
 
 		this.gateID.write(out);
 		EnumUtils.writeEnum(out, channelType);
@@ -106,7 +106,7 @@ public final class GateDeploymentDescriptor implements IOReadableWritable {
 
 
 	@Override
-	public void read(final DataInput in) throws IOException {
+	public void read(final DataInputView in) throws IOException {
 
 		this.gateID.read(in);
 		this.channelType = EnumUtils.readEnum(in, ChannelType.class);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/TaskDeploymentDescriptor.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/TaskDeploymentDescriptor.java
index 06fabde..4282600 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/TaskDeploymentDescriptor.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/deployment/TaskDeploymentDescriptor.java
@@ -13,13 +13,13 @@
 
 package eu.stratosphere.nephele.deployment;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.core.io.IOReadableWritable;
 import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
 import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
 import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
 import eu.stratosphere.nephele.jobgraph.JobID;
@@ -189,7 +189,7 @@ public final class TaskDeploymentDescriptor implements IOReadableWritable {
 
 
 	@Override
-	public void write(final DataOutput out) throws IOException {
+	public void write(final DataOutputView out) throws IOException {
 
 		this.jobID.write(out);
 		this.vertexID.write(out);
@@ -222,7 +222,7 @@ public final class TaskDeploymentDescriptor implements IOReadableWritable {
 
 	@SuppressWarnings("unchecked")
 	@Override
-	public void read(final DataInput in) throws IOException {
+	public void read(final DataInputView in) throws IOException {
 
 		this.jobID.read(in);
 		this.vertexID.read(in);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/AbstractEvent.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/AbstractEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/AbstractEvent.java
index 80e65ff..cffec9d 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/AbstractEvent.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/AbstractEvent.java
@@ -13,14 +13,14 @@
 
 package eu.stratosphere.nephele.event.job;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.concurrent.atomic.AtomicLong;
 
 import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
 
 /**
  * An abstract event is transmitted from the job manager to the
@@ -74,7 +74,7 @@ public abstract class AbstractEvent implements IOReadableWritable {
 
 
 	@Override
-	public void read(final DataInput in) throws IOException {
+	public void read(final DataInputView in) throws IOException {
 
 		// Read the timestamp
 		this.timestamp = in.readLong();
@@ -83,7 +83,7 @@ public abstract class AbstractEvent implements IOReadableWritable {
 
 
 	@Override
-	public void write(final DataOutput out) throws IOException {
+	public void write(final DataOutputView out) throws IOException {
 
 		// Write the timestamp
 		out.writeLong(this.timestamp);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/ExecutionStateChangeEvent.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/ExecutionStateChangeEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/ExecutionStateChangeEvent.java
index a2cac79..cf451ab 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/ExecutionStateChangeEvent.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/ExecutionStateChangeEvent.java
@@ -13,10 +13,10 @@
 
 package eu.stratosphere.nephele.event.job;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
 import eu.stratosphere.nephele.execution.ExecutionState;
 import eu.stratosphere.nephele.managementgraph.ManagementVertexID;
 import eu.stratosphere.nephele.util.EnumUtils;
@@ -86,7 +86,7 @@ public final class ExecutionStateChangeEvent extends AbstractEvent implements Ma
 
 
 	@Override
-	public void read(final DataInput in) throws IOException {
+	public void read(final DataInputView in) throws IOException {
 
 		super.read(in);
 
@@ -96,7 +96,7 @@ public final class ExecutionStateChangeEvent extends AbstractEvent implements Ma
 
 
 	@Override
-	public void write(final DataOutput out) throws IOException {
+	public void write(final DataOutputView out) throws IOException {
 
 		super.write(out);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/JobEvent.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/JobEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/JobEvent.java
index 420784a..b231978 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/JobEvent.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/JobEvent.java
@@ -13,11 +13,11 @@
 
 package eu.stratosphere.nephele.event.job;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
 import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
 import eu.stratosphere.nephele.jobgraph.JobStatus;
 import eu.stratosphere.nephele.util.EnumUtils;
 
@@ -68,7 +68,7 @@ public class JobEvent extends AbstractEvent {
 
 
 	@Override
-	public void read(final DataInput in) throws IOException {
+	public void read(final DataInputView in) throws IOException {
 		super.read(in);
 
 		// Read job status
@@ -80,7 +80,7 @@ public class JobEvent extends AbstractEvent {
 
 
 	@Override
-	public void write(final DataOutput out) throws IOException {
+	public void write(final DataOutputView out) throws IOException {
 		super.write(out);
 
 		// Write job status

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/RecentJobEvent.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/RecentJobEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/RecentJobEvent.java
index a477c7a..914adc3 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/RecentJobEvent.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/RecentJobEvent.java
@@ -13,11 +13,11 @@
 
 package eu.stratosphere.nephele.event.job;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
 import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
 import eu.stratosphere.nephele.jobgraph.JobID;
 import eu.stratosphere.nephele.jobgraph.JobStatus;
 import eu.stratosphere.nephele.util.EnumUtils;
@@ -139,7 +139,7 @@ public final class RecentJobEvent extends AbstractEvent implements ManagementEve
 
 
 	@Override
-	public void read(final DataInput in) throws IOException {
+	public void read(final DataInputView in) throws IOException {
 		super.read(in);
 
 		// Read the job ID
@@ -161,7 +161,7 @@ public final class RecentJobEvent extends AbstractEvent implements ManagementEve
 
 
 	@Override
-	public void write(final DataOutput out) throws IOException {
+	public void write(final DataOutputView out) throws IOException {
 		super.write(out);
 
 		// Write the job ID

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/VertexAssignmentEvent.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/VertexAssignmentEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/VertexAssignmentEvent.java
index c86c12b..4b980ed 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/VertexAssignmentEvent.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/VertexAssignmentEvent.java
@@ -13,11 +13,11 @@
 
 package eu.stratosphere.nephele.event.job;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
 import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
 import eu.stratosphere.nephele.managementgraph.ManagementVertexID;
 
 /**
@@ -83,7 +83,7 @@ public final class VertexAssignmentEvent extends AbstractEvent implements Manage
 	}
 
 	@Override
-	public void read(final DataInput in) throws IOException {
+	public void read(final DataInputView in) throws IOException {
 
 		super.read(in);
 
@@ -93,7 +93,7 @@ public final class VertexAssignmentEvent extends AbstractEvent implements Manage
 
 
 	@Override
-	public void write(final DataOutput out) throws IOException {
+	public void write(final DataOutputView out) throws IOException {
 
 		super.write(out);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/VertexEvent.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/VertexEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/VertexEvent.java
index 940f997..03f1525 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/VertexEvent.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/VertexEvent.java
@@ -13,11 +13,11 @@
 
 package eu.stratosphere.nephele.event.job;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
 import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
 import eu.stratosphere.nephele.execution.ExecutionState;
 import eu.stratosphere.nephele.jobgraph.JobVertexID;
 import eu.stratosphere.nephele.util.EnumUtils;
@@ -108,7 +108,7 @@ public class VertexEvent extends AbstractEvent {
 
 
 	@Override
-	public void read(final DataInput in) throws IOException {
+	public void read(final DataInputView in) throws IOException {
 
 		super.read(in);
 
@@ -122,7 +122,7 @@ public class VertexEvent extends AbstractEvent {
 
 
 	@Override
-	public void write(final DataOutput out) throws IOException {
+	public void write(final DataOutputView out) throws IOException {
 
 		super.write(out);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/task/IntegerTaskEvent.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/task/IntegerTaskEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/task/IntegerTaskEvent.java
index 04e7b18..43e42e9 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/task/IntegerTaskEvent.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/task/IntegerTaskEvent.java
@@ -30,8 +30,9 @@
 
 package eu.stratosphere.nephele.event.task;
 
-import java.io.DataInput;
-import java.io.DataOutput;
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
+
 import java.io.IOException;
 
 /**
@@ -74,13 +75,13 @@ public class IntegerTaskEvent extends AbstractTaskEvent {
 
 
 	@Override
-	public void write(final DataOutput out) throws IOException {
+	public void write(final DataOutputView out) throws IOException {
 		out.writeInt(this.value);
 	}
 
 
 	@Override
-	public void read(final DataInput in) throws IOException {
+	public void read(final DataInputView in) throws IOException {
 		this.value = in.readInt();
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/task/StringTaskEvent.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/task/StringTaskEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/task/StringTaskEvent.java
index 7079767..68e1ef5 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/task/StringTaskEvent.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/task/StringTaskEvent.java
@@ -29,11 +29,11 @@
  */
 package eu.stratosphere.nephele.event.task;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
 import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
 
 /**
  * This class provides a simple implementation of an event that holds a string value.
@@ -73,14 +73,14 @@ public class StringTaskEvent extends AbstractTaskEvent {
 
 
 	@Override
-	public void write(final DataOutput out) throws IOException {
+	public void write(final DataOutputView out) throws IOException {
 
 		StringRecord.writeString(out, this.message);
 	}
 
 
 	@Override
-	public void read(final DataInput in) throws IOException {
+	public void read(final DataInputView in) throws IOException {
 
 		this.message = StringRecord.readString(in);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/librarycache/LibraryCacheProfileRequest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/librarycache/LibraryCacheProfileRequest.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/librarycache/LibraryCacheProfileRequest.java
index dc50012..82e8686 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/librarycache/LibraryCacheProfileRequest.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/librarycache/LibraryCacheProfileRequest.java
@@ -13,12 +13,12 @@
 
 package eu.stratosphere.nephele.execution.librarycache;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
 import eu.stratosphere.core.io.IOReadableWritable;
 import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
 
 /**
  * A library cache profile request includes a set of library names and issues a task manager to report which of these
@@ -54,7 +54,7 @@ public class LibraryCacheProfileRequest implements IOReadableWritable {
 
 
 	@Override
-	public void read(final DataInput in) throws IOException {
+	public void read(final DataInputView in) throws IOException {
 
 		// Read required jar files
 		this.requiredLibraries = new String[in.readInt()];
@@ -66,7 +66,7 @@ public class LibraryCacheProfileRequest implements IOReadableWritable {
 
 
 	@Override
-	public void write(final DataOutput out) throws IOException {
+	public void write(final DataOutputView out) throws IOException {
 
 		if (this.requiredLibraries == null) {
 			throw new IOException("requiredLibraries is null");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/librarycache/LibraryCacheProfileResponse.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/librarycache/LibraryCacheProfileResponse.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/librarycache/LibraryCacheProfileResponse.java
index 3a2c92c..7c1ec26 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/librarycache/LibraryCacheProfileResponse.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/librarycache/LibraryCacheProfileResponse.java
@@ -13,12 +13,12 @@
 
 package eu.stratosphere.nephele.execution.librarycache;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
 import eu.stratosphere.core.io.IOReadableWritable;
 import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
 
 /**
  * A library cache profile response is the response to a library cache profile request. It contains the set of
@@ -92,7 +92,7 @@ public class LibraryCacheProfileResponse implements IOReadableWritable {
 
 
 	@Override
-	public void read(final DataInput in) throws IOException {
+	public void read(final DataInputView in) throws IOException {
 
 		// Read the names of the required jar files
 		this.requiredLibraries = new String[in.readInt()];
@@ -110,7 +110,7 @@ public class LibraryCacheProfileResponse implements IOReadableWritable {
 
 
 	@Override
-	public void write(final DataOutput out) throws IOException {
+	public void write(final DataOutputView out) throws IOException {
 
 		if (this.requiredLibraries == null) {
 			throw new IOException("requiredLibraries is null");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/librarycache/LibraryCacheUpdate.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/librarycache/LibraryCacheUpdate.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/librarycache/LibraryCacheUpdate.java
index 05473c7..6f7c404 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/librarycache/LibraryCacheUpdate.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/librarycache/LibraryCacheUpdate.java
@@ -13,11 +13,11 @@
 
 package eu.stratosphere.nephele.execution.librarycache;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
 import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
 
 /**
  * This class is used to encapsulate the transmission of a library file in a Nephele RPC call.
@@ -48,14 +48,14 @@ public class LibraryCacheUpdate implements IOReadableWritable {
 
 
 	@Override
-	public void read(final DataInput in) throws IOException {
+	public void read(final DataInputView in) throws IOException {
 
 		LibraryCacheManager.readLibraryFromStream(in);
 	}
 
 
 	@Override
-	public void write(final DataOutput out) throws IOException {
+	public void write(final DataOutputView out) throws IOException {
 
 		if (this.libraryFileName == null) {
 			throw new IOException("libraryFileName is null");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/HardwareDescription.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/HardwareDescription.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/HardwareDescription.java
index 83d650a..2ca6068 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/HardwareDescription.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/HardwareDescription.java
@@ -13,11 +13,11 @@
 
 package eu.stratosphere.nephele.instance;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
 import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
 
 /**
  * A hardware description reflects the hardware environment which is actually present on the task manager's compute
@@ -68,7 +68,7 @@ public final class HardwareDescription implements IOReadableWritable {
 	}
 
 	@Override
-	public void write(final DataOutput out) throws IOException {
+	public void write(final DataOutputView out) throws IOException {
 
 		out.writeInt(this.numberOfCPUCores);
 		out.writeLong(this.sizeOfPhysicalMemory);
@@ -76,7 +76,7 @@ public final class HardwareDescription implements IOReadableWritable {
 	}
 
 	@Override
-	public void read(final DataInput in) throws IOException {
+	public void read(final DataInputView in) throws IOException {
 
 		this.numberOfCPUCores = in.readInt();
 		this.sizeOfPhysicalMemory = in.readLong();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceConnectionInfo.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceConnectionInfo.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceConnectionInfo.java
index 98c3810..b47f649 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceConnectionInfo.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/InstanceConnectionInfo.java
@@ -13,14 +13,14 @@
 
 package eu.stratosphere.nephele.instance;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 
 import eu.stratosphere.core.io.IOReadableWritable;
 import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
 import eu.stratosphere.util.StringUtils;
 
 /**
@@ -206,7 +206,7 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
 
 
 	@Override
-	public void read(final DataInput in) throws IOException {
+	public void read(final DataInputView in) throws IOException {
 
 		final int addr_length = in.readInt();
 		byte[] address = new byte[addr_length];
@@ -226,7 +226,7 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
 
 
 	@Override
-	public void write(final DataOutput out) throws IOException {
+	public void write(final DataOutputView out) throws IOException {
 
 		out.writeInt(this.inetAddress.getAddress().length);
 		out.write(this.inetAddress.getAddress());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ipc/Client.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ipc/Client.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ipc/Client.java
index ea7bd4a..3e915cc 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ipc/Client.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ipc/Client.java
@@ -21,6 +21,7 @@ package eu.stratosphere.nephele.ipc;
 
 import eu.stratosphere.core.io.IOReadableWritable;
 import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.core.memory.InputViewDataInputStreamWrapper;
 import eu.stratosphere.nephele.net.NetUtils;
 import eu.stratosphere.nephele.util.IOUtils;
 import eu.stratosphere.runtime.io.serialization.DataOutputSerializer;
@@ -514,7 +515,7 @@ public class Client {
 						} catch (IllegalAccessException e) {
 							LOG.error(e);
 						}
-						value.read(in); // read value
+						value.read(new InputViewDataInputStreamWrapper(in)); // read value
 					}
 					call.setValue(value);
 				} else if (state == Status.ERROR.state) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ipc/ConnectionHeader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ipc/ConnectionHeader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ipc/ConnectionHeader.java
index d97dceb..78cffd3 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ipc/ConnectionHeader.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ipc/ConnectionHeader.java
@@ -19,12 +19,12 @@
 
 package eu.stratosphere.nephele.ipc;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
 import eu.stratosphere.core.io.IOReadableWritable;
 import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
 
 /**
  * The IPC connection header sent by the client to the server
@@ -49,14 +49,14 @@ class ConnectionHeader implements IOReadableWritable {
 
 
 	@Override
-	public void read(final DataInput in) throws IOException {
+	public void read(final DataInputView in) throws IOException {
 
 		this.protocol = StringRecord.readString(in);
 	}
 
 
 	@Override
-	public void write(final DataOutput out) throws IOException {
+	public void write(final DataOutputView out) throws IOException {
 
 		StringRecord.writeString(out, this.protocol);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ipc/RPC.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ipc/RPC.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ipc/RPC.java
index 8ff3548..f887673 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ipc/RPC.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ipc/RPC.java
@@ -19,8 +19,6 @@
 
 package eu.stratosphere.nephele.ipc;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
@@ -34,6 +32,8 @@ import java.util.Map;
 
 import javax.net.SocketFactory;
 
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -102,7 +102,7 @@ public class RPC {
 
 		// TODO: See if type safety can be improved here
 		@SuppressWarnings("unchecked")
-		public void read(DataInput in) throws IOException {
+		public void read(DataInputView in) throws IOException {
 
 			this.methodName = StringRecord.readString(in);
 			this.parameters = new IOReadableWritable[in.readInt()];
@@ -140,7 +140,7 @@ public class RPC {
 			}
 		}
 
-		public void write(DataOutput out) throws IOException {
+		public void write(DataOutputView out) throws IOException {
 			StringRecord.writeString(out, methodName);
 			out.writeInt(parameterClasses.length);
 			for (int i = 0; i < parameterClasses.length; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ipc/Server.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ipc/Server.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ipc/Server.java
index 7929e5f..6757c81 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ipc/Server.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ipc/Server.java
@@ -52,6 +52,8 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import eu.stratosphere.core.memory.InputViewDataInputStreamWrapper;
+import eu.stratosphere.core.memory.OutputViewDataOutputStreamWrapper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -875,7 +877,7 @@ public abstract class Server {
 		// / Reads the connection header following version
 		private void processProtocol() throws IOException {
 			DataInputStream in = new DataInputStream(new ByteArrayInputStream(data.array()));
-			header.read(in);
+			header.read(new InputViewDataInputStreamWrapper(in));
 			try {
 				String protocolClassName = header.getProtocol();
 				if (protocolClassName != null) {
@@ -894,7 +896,7 @@ public abstract class Server {
 
 
 			IOReadableWritable invocation = newInstance(invocationClass); // read param
-			invocation.read(dis);
+			invocation.read(new InputViewDataInputStreamWrapper(dis));
 
 			Call call = new Call(id, invocation, this);
 			callQueue.put(call); // queue the call; maybe blocked here
@@ -1048,7 +1050,7 @@ public abstract class Server {
 			} else {
 				out.writeBoolean(true);
 				StringRecord.writeString(out, rv.getClass().getName());
-				rv.write(out);
+				rv.write(new OutputViewDataOutputStreamWrapper(out));
 			}
 
 		} else {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
index cc936d9..5ccb428 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/AbstractJobVertex.java
@@ -13,11 +13,11 @@
 
 package eu.stratosphere.nephele.jobgraph;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
 import org.apache.commons.lang.Validate;
 
 import eu.stratosphere.configuration.Configuration;
@@ -374,7 +374,7 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
 
 	@SuppressWarnings("unchecked")
 	@Override
-	public void read(final DataInput in) throws IOException {
+	public void read(final DataInputView in) throws IOException {
 
 		if (jobGraph == null) {
 			throw new IOException("jobGraph is null, cannot deserialize");
@@ -454,7 +454,7 @@ public abstract class AbstractJobVertex implements IOReadableWritable {
 
 
 	@Override
-	public void write(final DataOutput out) throws IOException {
+	public void write(final DataOutputView out) throws IOException {
 
 		// Number of subtasks
 		out.writeInt(this.numberOfSubtasks);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGraph.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGraph.java
index 3d14d0a..b88fad0 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGraph.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobgraph/JobGraph.java
@@ -13,8 +13,6 @@
 
 package eu.stratosphere.nephele.jobgraph;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
@@ -34,6 +32,8 @@ import eu.stratosphere.core.fs.FileSystem;
 import eu.stratosphere.core.fs.Path;
 import eu.stratosphere.core.io.IOReadableWritable;
 import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
 import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
 import eu.stratosphere.util.ClassUtils;
 
@@ -465,7 +465,7 @@ public class JobGraph implements IOReadableWritable {
 
 
 	@Override
-	public void read(final DataInput in) throws IOException {
+	public void read(final DataInputView in) throws IOException {
 
 		// Read job id
 		this.jobID.read(in);
@@ -553,7 +553,7 @@ public class JobGraph implements IOReadableWritable {
 
 
 	@Override
-	public void write(final DataOutput out) throws IOException {
+	public void write(final DataOutputView out) throws IOException {
 
 		// Write job ID
 		this.jobID.write(out);
@@ -598,7 +598,8 @@ public class JobGraph implements IOReadableWritable {
 	 * @throws IOException
 	 *         thrown if an error occurs while writing to the stream
 	 */
-	private void writeRequiredJarFiles(final DataOutput out, final AbstractJobVertex[] jobVertices) throws IOException {
+	private void writeRequiredJarFiles(final DataOutputView out, final AbstractJobVertex[] jobVertices) throws
+			IOException {
 
 		// Now check if all the collected jar files really exist
 		final FileSystem fs = FileSystem.getLocalFileSystem();
@@ -643,7 +644,7 @@ public class JobGraph implements IOReadableWritable {
 	 * @throws IOException
 	 *         thrown if an error occurs while reading the stream
 	 */
-	private void readRequiredJarFiles(final DataInput in) throws IOException {
+	private void readRequiredJarFiles(final DataInputView in) throws IOException {
 
 		// Do jar files follow;
 		final int numJars = in.readInt();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitWrapper.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitWrapper.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitWrapper.java
index 811b919..ac18a84 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitWrapper.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/splitassigner/InputSplitWrapper.java
@@ -13,13 +13,13 @@
 
 package eu.stratosphere.nephele.jobmanager.splitassigner;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
 import eu.stratosphere.core.io.IOReadableWritable;
 import eu.stratosphere.core.io.InputSplit;
 import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
 import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
 import eu.stratosphere.nephele.jobgraph.JobID;
 import eu.stratosphere.util.StringUtils;
@@ -68,7 +68,7 @@ public final class InputSplitWrapper implements IOReadableWritable {
 
 
 	@Override
-	public void write(final DataOutput out) throws IOException {
+	public void write(final DataOutputView out) throws IOException {
 
 		// Write the job ID
 		this.jobID.write(out);
@@ -90,7 +90,7 @@ public final class InputSplitWrapper implements IOReadableWritable {
 
 	@SuppressWarnings("unchecked")
 	@Override
-	public void read(final DataInput in) throws IOException {
+	public void read(final DataInputView in) throws IOException {
 
 		// Read the job ID
 		this.jobID.read(in);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGraph.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGraph.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGraph.java
index fab720d..54710f3 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGraph.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGraph.java
@@ -13,8 +13,6 @@
 
 package eu.stratosphere.nephele.managementgraph;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -28,6 +26,8 @@ import java.util.Map;
 
 import eu.stratosphere.core.io.IOReadableWritable;
 import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
 import eu.stratosphere.runtime.io.channels.ChannelType;
 import eu.stratosphere.nephele.jobgraph.JobID;
 import eu.stratosphere.nephele.util.EnumUtils;
@@ -406,7 +406,7 @@ public final class ManagementGraph extends ManagementAttachment implements IORea
 
 
 	@Override
-	public void read(final DataInput in) throws IOException {
+	public void read(final DataInputView in) throws IOException {
 
 		// Read job ID
 		this.jobID.read(in);
@@ -485,7 +485,7 @@ public final class ManagementGraph extends ManagementAttachment implements IORea
 
 
 	@Override
-	public void write(final DataOutput out) throws IOException {
+	public void write(final DataOutputView out) throws IOException {
 
 		// Write job ID
 		this.jobID.write(out);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupVertex.java
index b98a153..afe64ec 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementGroupVertex.java
@@ -13,8 +13,6 @@
 
 package eu.stratosphere.nephele.managementgraph;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -23,6 +21,8 @@ import java.util.List;
 import java.util.Map;
 
 import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
 import eu.stratosphere.nephele.execution.ExecutionState;
 import eu.stratosphere.runtime.io.channels.ChannelType;
 import eu.stratosphere.nephele.util.EnumUtils;
@@ -342,7 +342,7 @@ public final class ManagementGroupVertex extends ManagementAttachment implements
 
 
 	@Override
-	public void read(final DataInput in) throws IOException {
+	public void read(final DataInputView in) throws IOException {
 
 		int numberOfForwardLinks = in.readInt();
 		for (int i = 0; i < numberOfForwardLinks; i++) {
@@ -359,7 +359,7 @@ public final class ManagementGroupVertex extends ManagementAttachment implements
 
 
 	@Override
-	public void write(final DataOutput out) throws IOException {
+	public void write(final DataOutputView out) throws IOException {
 
 		// Write the number of forward links
 		out.writeInt(this.forwardEdges.size());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementVertex.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementVertex.java
index a270700..c0a72b1 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementVertex.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/managementgraph/ManagementVertex.java
@@ -13,14 +13,14 @@
 
 package eu.stratosphere.nephele.managementgraph;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
 import eu.stratosphere.core.io.IOReadableWritable;
 import eu.stratosphere.core.io.StringRecord;
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
 import eu.stratosphere.nephele.execution.ExecutionState;
 import eu.stratosphere.nephele.util.EnumUtils;
 import eu.stratosphere.util.StringUtils;
@@ -266,7 +266,7 @@ public final class ManagementVertex extends ManagementAttachment implements IORe
 	}
 
 	@Override
-	public void read(final DataInput in) throws IOException {
+	public void read(final DataInputView in) throws IOException {
 
 		// Read the execution state
 		this.executionState = EnumUtils.readEnum(in, ExecutionState.class);
@@ -288,7 +288,7 @@ public final class ManagementVertex extends ManagementAttachment implements IORe
 
 
 	@Override
-	public void write(final DataOutput out) throws IOException {
+	public void write(final DataOutputView out) throws IOException {
 
 		// Write the execution state
 		EnumUtils.writeEnum(out, this.executionState);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/types/InternalExecutionVertexProfilingData.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/types/InternalExecutionVertexProfilingData.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/types/InternalExecutionVertexProfilingData.java
index a028835..df3e7db 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/types/InternalExecutionVertexProfilingData.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/types/InternalExecutionVertexProfilingData.java
@@ -13,10 +13,10 @@
 
 package eu.stratosphere.nephele.profiling.impl.types;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
 import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
 import eu.stratosphere.nephele.jobgraph.JobID;
 
@@ -47,14 +47,14 @@ public abstract class InternalExecutionVertexProfilingData implements InternalPr
 	}
 
 	@Override
-	public void read(DataInput in) throws IOException {
+	public void read(DataInputView in) throws IOException {
 
 		this.jobId.read(in);
 		this.executionVertexID.read(in);
 	}
 
 	@Override
-	public void write(DataOutput out) throws IOException {
+	public void write(DataOutputView out) throws IOException {
 
 		this.jobId.write(out);
 		this.executionVertexID.write(out);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/types/InternalExecutionVertexThreadProfilingData.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/types/InternalExecutionVertexThreadProfilingData.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/types/InternalExecutionVertexThreadProfilingData.java
index f1064f6..ac40ae9 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/types/InternalExecutionVertexThreadProfilingData.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/types/InternalExecutionVertexThreadProfilingData.java
@@ -13,10 +13,10 @@
 
 package eu.stratosphere.nephele.profiling.impl.types;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
 import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
 import eu.stratosphere.nephele.jobgraph.JobID;
 
@@ -48,7 +48,7 @@ public class InternalExecutionVertexThreadProfilingData extends InternalExecutio
 	}
 
 	@Override
-	public void read(DataInput in) throws IOException {
+	public void read(DataInputView in) throws IOException {
 
 		super.read(in);
 
@@ -60,7 +60,7 @@ public class InternalExecutionVertexThreadProfilingData extends InternalExecutio
 	}
 
 	@Override
-	public void write(DataOutput out) throws IOException {
+	public void write(DataOutputView out) throws IOException {
 
 		super.write(out);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/98e659e5/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/types/InternalInputGateProfilingData.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/types/InternalInputGateProfilingData.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/types/InternalInputGateProfilingData.java
index 376c7ce..80f9b7a 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/types/InternalInputGateProfilingData.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/profiling/impl/types/InternalInputGateProfilingData.java
@@ -13,10 +13,10 @@
 
 package eu.stratosphere.nephele.profiling.impl.types;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 
+import eu.stratosphere.core.memory.DataInputView;
+import eu.stratosphere.core.memory.DataOutputView;
 import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
 import eu.stratosphere.nephele.jobgraph.JobID;
 
@@ -50,7 +50,7 @@ public class InternalInputGateProfilingData implements InternalProfilingData {
 	}
 
 	@Override
-	public void read(DataInput in) throws IOException {
+	public void read(DataInputView in) throws IOException {
 
 		this.jobId.read(in);
 		this.executionVertexID.read(in);
@@ -60,7 +60,7 @@ public class InternalInputGateProfilingData implements InternalProfilingData {
 	}
 
 	@Override
-	public void write(DataOutput out) throws IOException {
+	public void write(DataOutputView out) throws IOException {
 
 		this.jobId.write(out);
 		this.executionVertexID.write(out);


Mime
View raw message