flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [12/34] Offer buffer-oriented API for I/O (#25)
Date Tue, 10 Jun 2014 19:35:09 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/OutgoingConnectionThread.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/OutgoingConnectionThread.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/OutgoingConnectionThread.java
new file mode 100644
index 0000000..dde26e3
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/OutgoingConnectionThread.java
@@ -0,0 +1,276 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.tcp;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayDeque;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Queue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import eu.stratosphere.util.StringUtils;
+
+public class OutgoingConnectionThread extends Thread {
+
+	/**
+	 * The minimum time a TCP connection must be idle it is closed.
+	 */
+	private static final long MIN_IDLE_TIME_BEFORE_CLOSE = 80000L; // 80 seconds
+
+	private static final Log LOG = LogFactory.getLog(OutgoingConnectionThread.class);
+
+	private final Selector selector;
+
+	private final Queue<OutgoingConnection> pendingConnectionRequests = new ArrayDeque<OutgoingConnection>();
+
+	private final Queue<SelectionKey> pendingWriteEventSubscribeRequests = new ArrayDeque<SelectionKey>();
+
+	private final Map<OutgoingConnection, Long> connectionsToClose = new HashMap<OutgoingConnection, Long>();
+
+	public OutgoingConnectionThread() throws IOException {
+		super("Outgoing Connection Thread");
+
+		this.selector = Selector.open();
+	}
+
+
+	@Override
+	public void run() {
+		try {
+			while (!isInterrupted()) {
+	
+				synchronized (this.pendingConnectionRequests) {
+	
+					if (!this.pendingConnectionRequests.isEmpty()) {
+	
+						final OutgoingConnection outgoingConnection = this.pendingConnectionRequests.poll();
+						try {
+							final SocketChannel socketChannel = SocketChannel.open();
+							socketChannel.configureBlocking(false);
+							final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT);
+							socketChannel.connect(outgoingConnection.getConnectionAddress());
+							key.attach(outgoingConnection);
+						} catch (final IOException ioe) {
+							// IOException is reported by separate thread to avoid deadlocks
+							final Runnable reporterThread = new Runnable() {
+	
+								@Override
+								public void run() {
+									outgoingConnection.reportConnectionProblem(ioe);
+								}
+							};
+							new Thread(reporterThread).start();
+						}
+					}
+				}
+	
+				synchronized (this.pendingWriteEventSubscribeRequests) {
+	
+					if (!this.pendingWriteEventSubscribeRequests.isEmpty()) {
+						final SelectionKey oldSelectionKey = this.pendingWriteEventSubscribeRequests.poll();
+						final OutgoingConnection outgoingConnection = (OutgoingConnection) oldSelectionKey.attachment();
+						final SocketChannel socketChannel = (SocketChannel) oldSelectionKey.channel();
+	
+						try {
+							final SelectionKey newSelectionKey = socketChannel.register(this.selector, SelectionKey.OP_READ
+								| SelectionKey.OP_WRITE);
+							newSelectionKey.attach(outgoingConnection);
+							outgoingConnection.setSelectionKey(newSelectionKey);
+						} catch (final IOException ioe) {
+							// IOException is reported by separate thread to avoid deadlocks
+							final Runnable reporterThread = new Runnable() {
+	
+								@Override
+								public void run() {
+									outgoingConnection.reportTransmissionProblem(ioe);
+								}
+							};
+							new Thread(reporterThread).start();
+						}
+					}
+				}
+	
+				synchronized (this.connectionsToClose) {
+	
+					final Iterator<Map.Entry<OutgoingConnection, Long>> closeIt = this.connectionsToClose.entrySet()
+						.iterator();
+					final long now = System.currentTimeMillis();
+					while (closeIt.hasNext()) {
+	
+						final Map.Entry<OutgoingConnection, Long> entry = closeIt.next();
+						if ((entry.getValue().longValue() + MIN_IDLE_TIME_BEFORE_CLOSE) < now) {
+							final OutgoingConnection outgoingConnection = entry.getKey();
+							closeIt.remove();
+							// Create new thread to close connection to avoid deadlocks
+							final Runnable closeThread = new Runnable() {
+	
+								@Override
+								public void run() {
+									try {
+										outgoingConnection.closeConnection();
+									} catch (IOException ioe) {
+										outgoingConnection.reportTransmissionProblem(ioe);
+									}
+								}
+							};
+	
+							new Thread(closeThread).start();
+						}
+	
+					}
+				}
+	
+				try {
+					this.selector.select(10);
+				} catch (IOException e) {
+					LOG.error(e);
+				}
+	
+				final Iterator<SelectionKey> iter = this.selector.selectedKeys().iterator();
+	
+				while (iter.hasNext()) {
+					final SelectionKey key = iter.next();
+	
+					iter.remove();
+					if (key.isValid()) {
+						if (key.isConnectable()) {
+							doConnect(key);
+						} else {
+							if (key.isReadable()) {
+								doRead(key);
+								// A read will always result in an exception, so the write key will not be valid anymore
+								continue;
+							}
+							if (key.isWritable()) {
+								doWrite(key);
+							}
+						}
+					} else {
+						LOG.error("Received invalid key: " + key);
+					}
+				}
+			}
+	
+			// Finally, try to close the selector
+			try {
+				this.selector.close();
+			} catch (IOException ioe) {
+				LOG.debug(StringUtils.stringifyException(ioe));
+			}
+		}
+		catch (Throwable t) {
+			// this is a disaster, this task manager cannot go on!
+			LOG.fatal("Outgoing connection thread died with an exception: " + t.getMessage(), t);
+			System.exit(1);
+		}
+	}
+
+	private void doConnect(SelectionKey key) {
+
+		final OutgoingConnection outgoingConnection = (OutgoingConnection) key.attachment();
+		final SocketChannel socketChannel = (SocketChannel) key.channel();
+		try {
+			while (!socketChannel.finishConnect()) {
+				try {
+					Thread.sleep(100);
+				} catch (InterruptedException e1) {
+					LOG.error(e1);
+				}
+			}
+
+			final SelectionKey channelKey = socketChannel.register(selector, SelectionKey.OP_WRITE
+				| SelectionKey.OP_READ);
+			outgoingConnection.setSelectionKey(channelKey);
+			channelKey.attach(outgoingConnection);
+
+		} catch (IOException ioe) {
+			outgoingConnection.reportConnectionProblem(ioe);
+		}
+	}
+
+	private void doWrite(SelectionKey key) {
+
+		final OutgoingConnection outgoingConnection = (OutgoingConnection) key.attachment();
+
+		try {
+
+			if (!outgoingConnection.write()) {
+				// Try to close the connection
+				outgoingConnection.requestClose();
+			}
+
+		} catch (IOException ioe) {
+			outgoingConnection.reportTransmissionProblem(ioe);
+		}
+	}
+
+	private void doRead(SelectionKey key) {
+
+		final SocketChannel socketChannel = (SocketChannel) key.channel();
+		final OutgoingConnection outgoingConnection = (OutgoingConnection) key.attachment();
+		final ByteBuffer buf = ByteBuffer.allocate(8);
+
+		try {
+
+			if (socketChannel.read(buf) == -1) {
+				outgoingConnection.reportTransmissionProblem(new IOException(
+					"Read unexpected EOF from channel"));
+			} else {
+				LOG.error("Outgoing connection read real data from channel");
+			}
+		} catch (IOException ioe) {
+			outgoingConnection.reportTransmissionProblem(ioe);
+		}
+	}
+
+	public void triggerConnect(OutgoingConnection outgoingConnection) {
+
+		synchronized (this.pendingConnectionRequests) {
+			this.pendingConnectionRequests.add(outgoingConnection);
+		}
+	}
+
+	public void unsubscribeFromWriteEvent(SelectionKey selectionKey) throws IOException {
+
+		final SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
+		final OutgoingConnection outgoingConnection = (OutgoingConnection) selectionKey.attachment();
+
+		final SelectionKey newSelectionKey = socketChannel.register(this.selector, SelectionKey.OP_READ);
+		newSelectionKey.attach(outgoingConnection);
+		outgoingConnection.setSelectionKey(newSelectionKey);
+
+		synchronized (this.connectionsToClose) {
+			this.connectionsToClose.put(outgoingConnection, Long.valueOf(System.currentTimeMillis()));
+		}
+	}
+
+	public void subscribeToWriteEvent(SelectionKey selectionKey) {
+
+		synchronized (this.pendingWriteEventSubscribeRequests) {
+			this.pendingWriteEventSubscribeRequests.add(selectionKey);
+		}
+		synchronized (this.connectionsToClose) {
+			this.connectionsToClose.remove((OutgoingConnection) selectionKey.attachment());
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/AdaptiveSpanningRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/AdaptiveSpanningRecordDeserializer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/AdaptiveSpanningRecordDeserializer.java
new file mode 100644
index 0000000..f4c8aec
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/AdaptiveSpanningRecordDeserializer.java
@@ -0,0 +1,521 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.core.memory.MemorySegment;
+import eu.stratosphere.runtime.io.serialization.DataInputDeserializer;
+import eu.stratosphere.runtime.io.serialization.DataOutputSerializer;
+import eu.stratosphere.runtime.io.serialization.RecordDeserializer;
+
+import java.io.DataInput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * @param <T> The type of the record to be deserialized.
+ */
+public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> implements RecordDeserializer<T> {
+	
+	private final NonSpanningWrapper nonSpanningWrapper;
+	
+	private final SpanningWrapper spanningWrapper;
+
+	public AdaptiveSpanningRecordDeserializer() {
+		this.nonSpanningWrapper = new NonSpanningWrapper();
+		this.spanningWrapper = new SpanningWrapper();
+	}
+	
+	@Override
+	public void setNextMemorySegment(MemorySegment segment, int numBytes) throws IOException {
+		// check if some spanning record deserialization is pending
+		if (this.spanningWrapper.getNumGatheredBytes() > 0) {
+			this.spanningWrapper.addNextChunkFromMemorySegment(segment, numBytes);
+		}
+		else {
+			this.nonSpanningWrapper.initializeFromMemorySegment(segment, 0, numBytes);
+		}
+	}
+	
+	@Override
+	public DeserializationResult getNextRecord(T target) throws IOException {
+		// always check the non-spanning wrapper first.
+		// this should be the majority of the cases for small records
+		// for large records, this portion of the work is very small in comparison anyways
+		
+		int nonSpanningRemaining = this.nonSpanningWrapper.remaining();
+		
+		// check if we can get a full length;
+		if (nonSpanningRemaining >= 4) {
+			int len = this.nonSpanningWrapper.readInt();
+			if (len <= nonSpanningRemaining - 4) {
+				// we can get a full record from here
+				target.read(this.nonSpanningWrapper);
+				
+				return (this.nonSpanningWrapper.remaining() == 0) ?
+					DeserializationResult.LAST_RECORD_FROM_BUFFER :
+					DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
+			} else {
+				// we got the length, but we need the rest from the spanning deserializer
+				// and need to wait for more buffers
+				this.spanningWrapper.initializeWithPartialRecord(this.nonSpanningWrapper, len);
+				this.nonSpanningWrapper.clear();
+				return DeserializationResult.PARTIAL_RECORD;
+			}
+		} else if (nonSpanningRemaining > 0) {
+			// we have an incomplete length
+			// add our part of the length to the length buffer
+			this.spanningWrapper.initializeWithPartialLength(this.nonSpanningWrapper);
+			this.nonSpanningWrapper.clear();
+			return DeserializationResult.PARTIAL_RECORD;
+		}
+		
+		// spanning record case
+		if (this.spanningWrapper.hasFullRecord()) {
+			// get the full record
+			target.read(this.spanningWrapper);
+			
+			// move the remainder to the non-spanning wrapper
+			// this does not copy it, only sets the memory segment
+			this.spanningWrapper.moveRemainderToNonSpanningDeserializer(this.nonSpanningWrapper);
+			this.spanningWrapper.clear();
+			
+			return (this.nonSpanningWrapper.remaining() == 0) ?
+				DeserializationResult.LAST_RECORD_FROM_BUFFER :
+				DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
+//		} else if (this.spanningWrapper.getNumGatheredBytes() == 0) {
+//			// error case. we are in the spanning deserializer, but it has no bytes, yet
+//			throw new IllegalStateException();
+		} else {
+			return DeserializationResult.PARTIAL_RECORD;
+		}
+	}
+
+	@Override
+	public void clear() {
+		this.nonSpanningWrapper.clear();
+		this.spanningWrapper.clear();
+	}
+
+	@Override
+	public boolean hasUnfinishedData() {
+		return this.nonSpanningWrapper.remaining() > 0 || this.spanningWrapper.getNumGatheredBytes() > 0;
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+	
+	private static final class NonSpanningWrapper implements DataInput {
+		
+		private MemorySegment segment;
+		
+		private int limit;
+		
+		private int position;
+		
+		private byte[] utfByteBuffer; // reusable byte buffer for utf-8 decoding
+		private char[] utfCharBuffer; // reusable char buffer for utf-8 decoding
+		
+		int remaining() {
+			return this.limit - this.position;
+		}
+		
+		void clear() {
+			this.segment = null;
+			this.limit = 0;
+			this.position = 0;
+		}
+		
+		void initializeFromMemorySegment(MemorySegment seg, int position, int leftOverLimit) {
+			this.segment = seg;
+			this.position = position;
+			this.limit = leftOverLimit;
+		}
+		
+		// -------------------------------------------------------------------------------------------------------------
+		//                                       DataInput specific methods
+		// -------------------------------------------------------------------------------------------------------------
+		
+		@Override
+		public final void readFully(byte[] b) throws IOException {
+			readFully(b, 0, b.length);
+		}
+
+		@Override
+		public final void readFully(byte[] b, int off, int len) throws IOException {
+			if (off < 0 || len < 0 || off + len > b.length)
+				throw new IndexOutOfBoundsException();
+			
+			this.segment.get(this.position, b, off, len);
+			this.position += len;
+		}
+
+		@Override
+		public final boolean readBoolean() throws IOException {
+			return readByte() == 1;
+		}
+
+		@Override
+		public final byte readByte() throws IOException {
+			return this.segment.get(this.position++);
+		}
+
+		@Override
+		public final int readUnsignedByte() throws IOException {
+			return readByte() & 0xff;
+		}
+
+		@Override
+		public final short readShort() throws IOException {
+			final short v = this.segment.getShort(this.position);
+			this.position += 2;
+			return v;
+		}
+
+		@Override
+		public final int readUnsignedShort() throws IOException {
+			final int v = this.segment.getShort(this.position) & 0xffff;
+			this.position += 2;
+			return v;
+		}
+
+		@Override
+		public final char readChar() throws IOException  {
+			final char v = this.segment.getChar(this.position);
+			this.position += 2;
+			return v;
+		}
+
+		@Override
+		public final int readInt() throws IOException {
+			final int v = this.segment.getIntBigEndian(this.position);
+			this.position += 4;
+			return v;
+		}
+
+		@Override
+		public final long readLong() throws IOException {
+			final long v = this.segment.getLongBigEndian(this.position);
+			this.position += 8;
+			return v;
+		}
+
+		@Override
+		public final float readFloat() throws IOException {
+			return Float.intBitsToFloat(readInt());
+		}
+
+		@Override
+		public final double readDouble() throws IOException {
+			return Double.longBitsToDouble(readLong());
+		}
+
+		@Override
+		public final String readLine() throws IOException {
+			final StringBuilder bld = new StringBuilder(32);
+			
+			try {
+				int b;
+				while ((b = readUnsignedByte()) != '\n') {
+					if (b != '\r')
+						bld.append((char) b);
+				}
+			}
+			catch (EOFException eofex) {}
+
+			if (bld.length() == 0)
+				return null;
+			
+			// trim a trailing carriage return
+			int len = bld.length();
+			if (len > 0 && bld.charAt(len - 1) == '\r') {
+				bld.setLength(len - 1);
+			}
+			return bld.toString();
+		}
+
+		@Override
+		public final String readUTF() throws IOException {
+			final int utflen = readUnsignedShort();
+			
+			final byte[] bytearr;
+			final char[] chararr;
+			
+			if (this.utfByteBuffer == null || this.utfByteBuffer.length < utflen) {
+				bytearr = new byte[utflen];
+				this.utfByteBuffer = bytearr;
+			} else {
+				bytearr = this.utfByteBuffer;
+			}
+			if (this.utfCharBuffer == null || this.utfCharBuffer.length < utflen) {
+				chararr = new char[utflen];
+				this.utfCharBuffer = chararr;
+			} else {
+				chararr = this.utfCharBuffer;
+			}
+
+			int c, char2, char3;
+			int count = 0;
+			int chararr_count = 0;
+
+			readFully(bytearr, 0, utflen);
+
+			while (count < utflen) {
+				c = (int) bytearr[count] & 0xff;
+				if (c > 127)
+					break;
+				count++;
+				chararr[chararr_count++] = (char) c;
+			}
+
+			while (count < utflen) {
+				c = (int) bytearr[count] & 0xff;
+				switch (c >> 4) {
+				case 0:
+				case 1:
+				case 2:
+				case 3:
+				case 4:
+				case 5:
+				case 6:
+				case 7:
+					count++;
+					chararr[chararr_count++] = (char) c;
+					break;
+				case 12:
+				case 13:
+					count += 2;
+					if (count > utflen)
+						throw new UTFDataFormatException("malformed input: partial character at end");
+					char2 = (int) bytearr[count - 1];
+					if ((char2 & 0xC0) != 0x80)
+						throw new UTFDataFormatException("malformed input around byte " + count);
+					chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
+					break;
+				case 14:
+					count += 3;
+					if (count > utflen)
+						throw new UTFDataFormatException("malformed input: partial character at end");
+					char2 = (int) bytearr[count - 2];
+					char3 = (int) bytearr[count - 1];
+					if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80))
+						throw new UTFDataFormatException("malformed input around byte " + (count - 1));
+					chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
+					break;
+				default:
+					throw new UTFDataFormatException("malformed input around byte " + count);
+				}
+			}
+			// The number of chars produced may be less than utflen
+			return new String(chararr, 0, chararr_count);
+		}
+		
+		@Override
+		public final int skipBytes(int n) throws IOException {
+			if (n < 0)
+				throw new IllegalArgumentException();
+			
+			int toSkip = Math.min(n, remaining());
+			this.position += toSkip;
+			return toSkip;
+		}
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+	
+	private static final class SpanningWrapper implements DataInput {
+
+		private final DataOutputSerializer serializationBuffer;
+
+		private final DataInputDeserializer serializationReadBuffer;
+
+		private final ByteBuffer lengthBuffer;
+
+		private int recordLength;
+
+		private MemorySegment leftOverData;
+
+		private int leftOverStart;
+
+		private int leftOverLimit;
+
+		private int recordLimit;
+
+		public SpanningWrapper() {
+			this.lengthBuffer = ByteBuffer.allocate(4);
+			this.lengthBuffer.order(ByteOrder.BIG_ENDIAN);
+
+			this.recordLength = -1;
+
+			this.serializationBuffer = new DataOutputSerializer(1024);
+			this.serializationReadBuffer = new DataInputDeserializer();
+		}
+		
+		private void initializeWithPartialRecord(NonSpanningWrapper partial, int nextRecordLength) throws IOException {
+			// set the length and copy what is available to the buffer
+			this.recordLength = nextRecordLength;
+			this.recordLimit = partial.remaining();
+			partial.segment.get(this.serializationBuffer, partial.position, partial.remaining());
+			this.serializationReadBuffer.setBuffer(this.serializationBuffer.wrapAsByteBuffer());
+		}
+		
+		private void initializeWithPartialLength(NonSpanningWrapper partial) throws IOException {
+			// copy what we have to the length buffer
+			partial.segment.get(partial.position, this.lengthBuffer, partial.remaining());
+		}
+		
+		private void addNextChunkFromMemorySegment(MemorySegment segment, int numBytesInSegment) throws IOException {
+			int segmentPosition = 0;
+			
+			// check where to go. if we have a partial length, we need to complete it first
+			if (this.lengthBuffer.position() > 0) {
+				int toPut = Math.min(this.lengthBuffer.remaining(), numBytesInSegment);
+				segment.get(0, this.lengthBuffer, toPut);
+				
+				// did we complete the length?
+				if (this.lengthBuffer.hasRemaining()) {
+					return;
+				} else {
+					this.recordLength = this.lengthBuffer.getInt(0);
+					this.lengthBuffer.clear();
+					segmentPosition = toPut;
+				}
+			}
+
+			// copy as much as we need or can for this next spanning record
+			int needed = this.recordLength - this.recordLimit;
+			int available = numBytesInSegment - segmentPosition;
+			int toCopy = Math.min(needed, available);
+
+			segment.get(this.serializationBuffer, segmentPosition, toCopy);
+			this.recordLimit += toCopy;
+			
+			if (toCopy < available) {
+				// there is more data in the segment
+				this.leftOverData = segment;
+				this.leftOverStart = segmentPosition + toCopy;
+				this.leftOverLimit = numBytesInSegment;
+			}
+
+			// update read view
+			this.serializationReadBuffer.setBuffer(this.serializationBuffer.wrapAsByteBuffer());
+		}
+		
+		private void moveRemainderToNonSpanningDeserializer(NonSpanningWrapper deserializer) {
+			deserializer.clear();
+			
+			if (leftOverData != null) {
+				deserializer.initializeFromMemorySegment(leftOverData, leftOverStart, leftOverLimit);
+			}
+		}
+		
+		private boolean hasFullRecord() {
+			return this.recordLength >= 0 && this.recordLimit >= this.recordLength;
+		}
+		
+		private int getNumGatheredBytes() {
+			return this.recordLimit + (this.recordLength >= 0 ? 4 : lengthBuffer.position()) + this.serializationBuffer.length();
+		}
+
+		public void clear() {
+			this.serializationBuffer.clear();
+
+			this.recordLength = -1;
+			this.lengthBuffer.clear();
+			this.leftOverData = null;
+			this.recordLimit = 0;
+		}
+
+		// -------------------------------------------------------------------------------------------------------------
+		//                                       DataInput specific methods
+		// -------------------------------------------------------------------------------------------------------------
+
+		@Override
+		public void readFully(byte[] b) throws IOException {
+			this.serializationReadBuffer.readFully(b);
+		}
+
+		@Override
+		public void readFully(byte[] b, int off, int len) throws IOException {
+			this.serializationReadBuffer.readFully(b, off, len);
+		}
+
+		@Override
+		public int skipBytes(int n) throws IOException {
+			return this.serializationReadBuffer.skipBytes(n);
+		}
+
+		@Override
+		public boolean readBoolean() throws IOException {
+			return this.serializationReadBuffer.readBoolean();
+		}
+
+		@Override
+		public byte readByte() throws IOException {
+			return this.serializationReadBuffer.readByte();
+		}
+
+		@Override
+		public int readUnsignedByte() throws IOException {
+			return this.serializationReadBuffer.readUnsignedByte();
+		}
+
+		@Override
+		public short readShort() throws IOException {
+			return this.serializationReadBuffer.readShort();
+		}
+
+		@Override
+		public int readUnsignedShort() throws IOException {
+			return this.serializationReadBuffer.readUnsignedShort();
+		}
+
+		@Override
+		public char readChar() throws IOException {
+			return this.serializationReadBuffer.readChar();
+		}
+
+		@Override
+		public int readInt() throws IOException {
+			return this.serializationReadBuffer.readInt();
+		}
+
+		@Override
+		public long readLong() throws IOException {
+			return this.serializationReadBuffer.readLong();
+		}
+
+		@Override
+		public float readFloat() throws IOException {
+			return this.serializationReadBuffer.readFloat();
+		}
+
+		@Override
+		public double readDouble() throws IOException {
+			return this.serializationReadBuffer.readDouble();
+		}
+
+		@Override
+		public String readLine() throws IOException {
+			return this.serializationReadBuffer.readLine();
+		}
+
+		@Override
+		public String readUTF() throws IOException {
+			return this.serializationReadBuffer.readUTF();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/DataInputDeserializer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/DataInputDeserializer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/DataInputDeserializer.java
new file mode 100644
index 0000000..e6479fe
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/DataInputDeserializer.java
@@ -0,0 +1,307 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization;
+
+import eu.stratosphere.core.memory.MemoryUtils;
+
+import java.io.DataInput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * A simple and efficient deserializer for the {@link java.io.DataInput} interface.
+ */
+public class DataInputDeserializer implements DataInput {
+	
+	private byte[] buffer;
+	
+	private int end;
+
+	private int position;
+
+	public DataInputDeserializer() {
+	}
+	
+	public DataInputDeserializer(byte[] buffer, int start, int len) {
+		setBuffer(buffer, start, len);
+	}
+	
+	public DataInputDeserializer(ByteBuffer buffer) {
+		setBuffer(buffer);
+	}
+
+	public void setBuffer(ByteBuffer buffer) {
+		if (buffer.hasArray()) {
+			this.buffer = buffer.array();
+			this.position = buffer.arrayOffset() + buffer.position();
+			this.end = this.position + buffer.remaining();
+		} else if (buffer.isDirect()) {
+			this.buffer = new byte[buffer.remaining()];
+			this.position = 0;
+			this.end = this.buffer.length;
+
+			buffer.get(this.buffer);
+		} else {
+			throw new IllegalArgumentException("The given buffer is neither an array-backed heap ByteBuffer, nor a direct ByteBuffer.");
+		}
+	}
+
+	public void setBuffer(byte[] buffer, int start, int len) {
+		if (buffer == null)
+			throw new NullPointerException();
+
+		if (start < 0 || len < 0 || start + len >= buffer.length)
+			throw new IllegalArgumentException();
+
+		this.buffer = buffer;
+		this.position = start;
+		this.end = start * len;
+	}
+
+	// ----------------------------------------------------------------------------------------
+	//                               Data Input
+	// ----------------------------------------------------------------------------------------
+	
+	@Override
+	public boolean readBoolean() throws IOException {
+		if (this.position < this.end) {
+			return this.buffer[this.position++] != 0;
+		} else {
+			throw new EOFException();
+		}
+	}
+
+	@Override
+	public byte readByte() throws IOException {
+		if (this.position < this.end) {
+			return this.buffer[this.position++];
+		} else {
+			throw new EOFException();
+		}
+	}
+
+	@Override
+	public char readChar() throws IOException {
+		if (this.position < this.end - 1) {
+			return (char) (((this.buffer[this.position++] & 0xff) << 8) | ((this.buffer[this.position++] & 0xff) << 0));
+		} else {
+			throw new EOFException();
+		}
+	}
+
+	@Override
+	public double readDouble() throws IOException {
+		return Double.longBitsToDouble(readLong());
+	}
+
+	@Override
+	public float readFloat() throws IOException {
+		return Float.intBitsToFloat(readInt());
+	}
+
+	@Override
+	public void readFully(byte[] b) throws IOException {
+		readFully(b, 0, b.length);
+	}
+
+	@Override
+	public void readFully(byte[] b, int off, int len) throws IOException {
+		if (len >= 0) {
+			if (off <= b.length - len) {
+				if (this.position <= this.end - len) {
+					System.arraycopy(this.buffer, position, b, off, len);
+					position += len;
+				} else {
+					throw new EOFException();
+				}
+			} else {
+				throw new ArrayIndexOutOfBoundsException();
+			}
+		} else if (len < 0) {
+			throw new IllegalArgumentException("Length may not be negative.");
+		}
+	}
+
+	@Override
+	public int readInt() throws IOException {
+		if (this.position >= 0 && this.position < this.end - 3) {
+			@SuppressWarnings("restriction")
+			int value = UNSAFE.getInt(this.buffer, BASE_OFFSET + this.position);
+			if (LITTLE_ENDIAN) {
+				 value = Integer.reverseBytes(value);
+			}
+			
+			this.position += 4;
+			return value;
+		} else {
+			throw new EOFException();
+		}
+	}
+
+	@Override
+	public String readLine() throws IOException {
+		if (this.position < this.end) {
+			// read until a newline is found
+			StringBuilder bld = new StringBuilder();
+			char curr = (char) readUnsignedByte();
+			while (position < this.end && curr != '\n') {
+				bld.append(curr);
+				curr = (char) readUnsignedByte();
+			}
+			// trim a trailing carriage return
+			int len = bld.length();
+			if (len > 0 && bld.charAt(len - 1) == '\r') {
+				bld.setLength(len - 1);
+			}
+			String s = bld.toString();
+			bld.setLength(0);
+			return s;
+		} else {
+			return null;
+		}
+	}
+
+	@Override
+	public long readLong() throws IOException {
+		if (position >= 0 && position < this.end - 7) {
+			@SuppressWarnings("restriction")
+			long value = UNSAFE.getLong(this.buffer, BASE_OFFSET + this.position);
+			if (LITTLE_ENDIAN) {
+				 value = Long.reverseBytes(value);
+			}
+			this.position += 8;
+			return value;
+		} else {
+			throw new EOFException();
+		}
+	}
+
+	@Override
+	public short readShort() throws IOException {
+		if (position >= 0 && position < this.end - 1) {
+			return (short) ((((this.buffer[position++]) & 0xff) << 8) | (((this.buffer[position++]) & 0xff) << 0));
+		} else {
+			throw new EOFException();
+		}
+	}
+
+	@Override
+	public String readUTF() throws IOException {
+		int utflen = readUnsignedShort();
+		byte[] bytearr = new byte[utflen];
+		char[] chararr = new char[utflen];
+
+		int c, char2, char3;
+		int count = 0;
+		int chararr_count = 0;
+
+		readFully(bytearr, 0, utflen);
+
+		while (count < utflen) {
+			c = (int) bytearr[count] & 0xff;
+			if (c > 127)
+				break;
+			count++;
+			chararr[chararr_count++] = (char) c;
+		}
+
+		while (count < utflen) {
+			c = (int) bytearr[count] & 0xff;
+			switch (c >> 4) {
+			case 0:
+			case 1:
+			case 2:
+			case 3:
+			case 4:
+			case 5:
+			case 6:
+			case 7:
+				/* 0xxxxxxx */
+				count++;
+				chararr[chararr_count++] = (char) c;
+				break;
+			case 12:
+			case 13:
+				/* 110x xxxx 10xx xxxx */
+				count += 2;
+				if (count > utflen)
+					throw new UTFDataFormatException("malformed input: partial character at end");
+				char2 = (int) bytearr[count - 1];
+				if ((char2 & 0xC0) != 0x80)
+					throw new UTFDataFormatException("malformed input around byte " + count);
+				chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
+				break;
+			case 14:
+				/* 1110 xxxx 10xx xxxx 10xx xxxx */
+				count += 3;
+				if (count > utflen)
+					throw new UTFDataFormatException("malformed input: partial character at end");
+				char2 = (int) bytearr[count - 2];
+				char3 = (int) bytearr[count - 1];
+				if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80))
+					throw new UTFDataFormatException("malformed input around byte " + (count - 1));
+				chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
+				break;
+			default:
+				/* 10xx xxxx, 1111 xxxx */
+				throw new UTFDataFormatException("malformed input around byte " + count);
+			}
+		}
+		// The number of chars produced may be less than utflen
+		return new String(chararr, 0, chararr_count);
+	}
+
+	@Override
+	public int readUnsignedByte() throws IOException {
+		if (this.position < this.end) {
+			return (this.buffer[this.position++] & 0xff);
+		} else {
+			throw new EOFException();
+		}
+	}
+
+	@Override
+	public int readUnsignedShort() throws IOException {
+		if (this.position < this.end - 1) {
+			return ((this.buffer[this.position++] & 0xff) << 8) | ((this.buffer[this.position++] & 0xff) << 0);
+		} else {
+			throw new EOFException();
+		}
+	}
+	
+	@Override
+	public int skipBytes(int n) throws IOException {
+		if (this.position <= this.end - n) {
+			this.position += n;
+			return n;
+		} else {
+			n = this.end - this.position;
+			this.position = this.end;
+			return n;
+		}
+	}
+	
+	@SuppressWarnings("restriction")
+	private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
+	
+	@SuppressWarnings("restriction")
+	private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
+	
+	private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/DataOutputSerializer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/DataOutputSerializer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/DataOutputSerializer.java
new file mode 100644
index 0000000..b5171b9
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/DataOutputSerializer.java
@@ -0,0 +1,259 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+package eu.stratosphere.runtime.io.serialization;
+
+import eu.stratosphere.core.memory.MemoryUtils;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * A simple and efficient serializer for the {@link java.io.DataOutput} interface.
+ */
+public class DataOutputSerializer implements DataOutput {
+	
+	private byte[] buffer;
+	
+	private int position;
+
+	private ByteBuffer wrapper;
+	
+	public DataOutputSerializer(int startSize) {
+		if (startSize < 1) {
+			throw new IllegalArgumentException();
+		}
+
+		this.buffer = new byte[startSize];
+		this.wrapper = ByteBuffer.wrap(buffer);
+	}
+	
+	public ByteBuffer wrapAsByteBuffer() {
+		this.wrapper.position(0);
+		this.wrapper.limit(this.position);
+		return this.wrapper;
+	}
+
+	public void clear() {
+		this.position = 0;
+	}
+
+	public int length() {
+		return this.position;
+	}
+
+	@Override
+	public String toString() {
+		return String.format("[pos=%d cap=%d]", this.position, this.buffer.length);
+	}
+
+	// ----------------------------------------------------------------------------------------
+	//                               Data Output
+	// ----------------------------------------------------------------------------------------
+	
+	@Override
+	public void write(int b) throws IOException {
+		if (this.position >= this.buffer.length) {
+			resize(1);
+		}
+		this.buffer[this.position++] = (byte) (b & 0xff);
+	}
+
+	@Override
+	public void write(byte[] b) throws IOException {
+		write(b, 0, b.length);
+	}
+
+	@Override
+	public void write(byte[] b, int off, int len) throws IOException {
+		if (len < 0 || off > b.length - len) {
+			throw new ArrayIndexOutOfBoundsException();
+		}
+		if (this.position > this.buffer.length - len) {
+			resize(len);
+		}
+		System.arraycopy(b, off, this.buffer, this.position, len);
+		this.position += len;
+	}
+
+	@Override
+	public void writeBoolean(boolean v) throws IOException {
+		write(v ? 1 : 0);
+	}
+
+	@Override
+	public void writeByte(int v) throws IOException {
+		write(v);
+	}
+
+	@Override
+	public void writeBytes(String s) throws IOException {
+		final int sLen = s.length();
+		if (this.position >= this.buffer.length - sLen) {
+			resize(sLen);
+		}
+		
+		for (int i = 0; i < sLen; i++) {
+			writeByte(s.charAt(i));
+		}
+		this.position += sLen;
+	}
+
+	@Override
+	public void writeChar(int v) throws IOException {
+		if (this.position >= this.buffer.length - 1) {
+			resize(2);
+		}
+		this.buffer[this.position++] = (byte) (v >> 8);
+		this.buffer[this.position++] = (byte) v;
+	}
+
+	@Override
+	public void writeChars(String s) throws IOException {
+		final int sLen = s.length();
+		if (this.position >= this.buffer.length - 2*sLen) {
+			resize(2*sLen);
+		} 
+		for (int i = 0; i < sLen; i++) {
+			writeChar(s.charAt(i));
+		}
+	}
+
+	@Override
+	public void writeDouble(double v) throws IOException {
+		writeLong(Double.doubleToLongBits(v));
+	}
+
+	@Override
+	public void writeFloat(float v) throws IOException {
+		writeInt(Float.floatToIntBits(v));
+	}
+
+	@SuppressWarnings("restriction")
+	@Override
+	public void writeInt(int v) throws IOException {
+		if (this.position >= this.buffer.length - 3) {
+			resize(4);
+		}
+		if (LITTLE_ENDIAN) {
+			v = Integer.reverseBytes(v);
+		}			
+		UNSAFE.putInt(this.buffer, BASE_OFFSET + this.position, v);
+		this.position += 4;
+	}
+
+	@SuppressWarnings("restriction")
+	@Override
+	public void writeLong(long v) throws IOException {
+		if (this.position >= this.buffer.length - 7) {
+			resize(8);
+		}
+		if (LITTLE_ENDIAN) {
+			v = Long.reverseBytes(v);
+		}
+		UNSAFE.putLong(this.buffer, BASE_OFFSET + this.position, v);
+		this.position += 8;
+	}
+
+	@Override
+	public void writeShort(int v) throws IOException {
+		if (this.position >= this.buffer.length - 1) {
+			resize(2);
+		}
+		this.buffer[this.position++] = (byte) ((v >>> 8) & 0xff);
+		this.buffer[this.position++] = (byte) ((v >>> 0) & 0xff);
+	}
+
+	@Override
+	public void writeUTF(String str) throws IOException {
+		int strlen = str.length();
+		int utflen = 0;
+		int c;
+
+		/* use charAt instead of copying String to char array */
+		for (int i = 0; i < strlen; i++) {
+			c = str.charAt(i);
+			if ((c >= 0x0001) && (c <= 0x007F)) {
+				utflen++;
+			} else if (c > 0x07FF) {
+				utflen += 3;
+			} else {
+				utflen += 2;
+			}
+		}
+
+		if (utflen > 65535)
+			throw new UTFDataFormatException("Encoded string is too long: " + utflen);
+		
+		else if (this.position > this.buffer.length - utflen - 2) {
+			resize(utflen + 2);
+		}
+		
+		byte[] bytearr = this.buffer;
+		int count = this.position;
+
+		bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
+		bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF);
+
+		int i = 0;
+		for (i = 0; i < strlen; i++) {
+			c = str.charAt(i);
+			if (!((c >= 0x0001) && (c <= 0x007F)))
+				break;
+			bytearr[count++] = (byte) c;
+		}
+
+		for (; i < strlen; i++) {
+			c = str.charAt(i);
+			if ((c >= 0x0001) && (c <= 0x007F)) {
+				bytearr[count++] = (byte) c;
+
+			} else if (c > 0x07FF) {
+				bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+				bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
+				bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+			} else {
+				bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
+				bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
+			}
+		}
+
+		this.position = count;
+	}
+	
+	
+	private final void resize(int minCapacityAdd) throws IOException {
+		try {
+			final int newLen = Math.max(this.buffer.length * 2, this.buffer.length + minCapacityAdd);
+			final byte[] nb = new byte[newLen];
+			System.arraycopy(this.buffer, 0, nb, 0, this.position);
+			this.buffer = nb;
+			this.wrapper = ByteBuffer.wrap(this.buffer);
+		}
+		catch (NegativeArraySizeException nasex) {
+			throw new IOException("Serialization failed because the record length would exceed 2GB (max addressable array size in Java).");
+		}
+	}
+	
+	@SuppressWarnings("restriction")
+	private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
+	
+	@SuppressWarnings("restriction")
+	private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
+	
+	private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/RecordDeserializer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/RecordDeserializer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/RecordDeserializer.java
new file mode 100644
index 0000000..708c693
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/RecordDeserializer.java
@@ -0,0 +1,56 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.core.memory.MemorySegment;
+
+import java.io.IOException;
+
+/**
+ * Interface for turning sequences of memory segments into records.
+ */
+public interface RecordDeserializer<T extends IOReadableWritable> {
+
+	public static enum DeserializationResult {
+		PARTIAL_RECORD(false, true),
+		INTERMEDIATE_RECORD_FROM_BUFFER(true, false),
+		LAST_RECORD_FROM_BUFFER(true, true);
+
+		private final boolean isFullRecord;
+
+		private final boolean isBufferConsumed;
+
+		private DeserializationResult(boolean isFullRecord, boolean isBufferConsumed) {
+			this.isFullRecord = isFullRecord;
+			this.isBufferConsumed = isBufferConsumed;
+		}
+
+		public boolean isFullRecord () {
+			return this.isFullRecord;
+		}
+
+		public boolean isBufferConsumed() {
+			return this.isBufferConsumed;
+		}
+	}
+	
+	DeserializationResult getNextRecord(T target) throws IOException;
+
+	void setNextMemorySegment(MemorySegment segment, int numBytes) throws IOException;
+
+	void clear();
+	
+	boolean hasUnfinishedData();
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/RecordSerializer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/RecordSerializer.java
new file mode 100644
index 0000000..b540e27
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/RecordSerializer.java
@@ -0,0 +1,60 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.runtime.io.Buffer;
+
+import java.io.IOException;
+
+/**
+ * Interface for turning records into sequences of memory segments.
+ */
+public interface RecordSerializer<T extends IOReadableWritable> {
+
+	public static enum SerializationResult {
+		PARTIAL_RECORD_MEMORY_SEGMENT_FULL(false, true),
+		FULL_RECORD_MEMORY_SEGMENT_FULL(true, true),
+		FULL_RECORD(true, false);
+		
+		private final boolean isFullRecord;
+
+		private final boolean isFullBuffer;
+		
+		private SerializationResult(boolean isFullRecord, boolean isFullBuffer) {
+			this.isFullRecord = isFullRecord;
+			this.isFullBuffer = isFullBuffer;
+		}
+		
+		public boolean isFullRecord() {
+			return this.isFullRecord;
+		}
+		
+		public boolean isFullBuffer() {
+			return this.isFullBuffer;
+		}
+	}
+	
+	SerializationResult addRecord(T record) throws IOException;
+
+	SerializationResult setNextBuffer(Buffer buffer) throws IOException;
+
+	Buffer getCurrentBuffer();
+	
+	void clear();
+	
+	boolean hasData();
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/SpanningRecordSerializer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/SpanningRecordSerializer.java
new file mode 100644
index 0000000..443f8d8
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/serialization/SpanningRecordSerializer.java
@@ -0,0 +1,153 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.serialization;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.core.memory.MemorySegment;
+import eu.stratosphere.runtime.io.Buffer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class SpanningRecordSerializer<T extends IOReadableWritable> implements RecordSerializer<T> {
+
+	/** Flag to enable/disable checks, if buffer not set/full or pending serialization */
+	private static final boolean CHECKED = true;
+
+	/** Intermediate data serialization */
+	private final DataOutputSerializer serializationBuffer;
+
+	/** Intermediate buffer for data serialization */
+	private ByteBuffer dataBuffer;
+
+	/** Intermediate buffer for length serialization */
+	private final ByteBuffer lengthBuffer;
+
+	/** Current target {@link eu.stratosphere.runtime.io.Buffer} of the serializer */
+	private Buffer targetBuffer;
+
+	/** Position in current {@link MemorySegment} of target buffer */
+	private int position;
+
+	/** Limit of current {@link MemorySegment} of target buffer */
+	private int limit;
+
+	public SpanningRecordSerializer() {
+		this.serializationBuffer = new DataOutputSerializer(128);
+
+		this.lengthBuffer = ByteBuffer.allocate(4);
+		this.lengthBuffer.order(ByteOrder.BIG_ENDIAN);
+
+		// ensure initial state with hasRemaining false (for correct setNextBuffer logic)
+		this.dataBuffer = this.serializationBuffer.wrapAsByteBuffer();
+		this.lengthBuffer.position(4);
+	}
+
+	@Override
+	public SerializationResult addRecord(T record) throws IOException {
+		if (CHECKED) {
+			if (this.dataBuffer.hasRemaining()) {
+				throw new IllegalStateException("Pending serialization of previous record.");
+			}
+		}
+
+		this.serializationBuffer.clear();
+		this.lengthBuffer.clear();
+
+		// write data and length
+		record.write(this.serializationBuffer);
+		this.lengthBuffer.putInt(0, this.serializationBuffer.length());
+
+		this.dataBuffer = this.serializationBuffer.wrapAsByteBuffer();
+
+		// Copy from intermediate buffers to current target memory segment
+		copyToTargetBufferFrom(this.lengthBuffer);
+		copyToTargetBufferFrom(this.dataBuffer);
+
+		return getSerializationResult();
+	}
+
+	@Override
+	public SerializationResult setNextBuffer(Buffer buffer) throws IOException {
+		this.targetBuffer = buffer;
+		this.position = 0;
+		this.limit = buffer.size();
+
+		if (this.lengthBuffer.hasRemaining()) {
+			copyToTargetBufferFrom(this.lengthBuffer);
+		}
+
+		if (this.dataBuffer.hasRemaining()) {
+			copyToTargetBufferFrom(this.dataBuffer);
+		}
+
+		return getSerializationResult();
+	}
+
+	/**
+	 * Copies as many bytes as possible from the given {@link ByteBuffer} to the {@link MemorySegment} of the target
+	 * {@link Buffer} and advances the current position by the number of written bytes.
+	 *
+	 * @param source the {@link ByteBuffer} to copy data from
+	 */
+	private void copyToTargetBufferFrom(ByteBuffer source) {
+		if (this.targetBuffer == null)
+			return;
+
+		int needed = source.remaining();
+		int available = this.limit - this.position;
+		int toCopy = Math.min(needed, available);
+
+		this.targetBuffer.getMemorySegment().put(this.position, source, toCopy);
+
+		this.position += toCopy;
+	}
+
+	private SerializationResult getSerializationResult() {
+		if (!this.dataBuffer.hasRemaining() && !this.lengthBuffer.hasRemaining()) {
+			return (this.position < this.limit)
+					? SerializationResult.FULL_RECORD
+					: SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL;
+		}
+
+		return SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL;
+	}
+
+	@Override
+	public Buffer getCurrentBuffer() {
+		if (targetBuffer == null)
+			return null;
+
+		this.targetBuffer.limitSize(this.position);
+		return this.targetBuffer;
+	}
+
+	@Override
+	public void clear() {
+		this.targetBuffer = null;
+		this.position = 0;
+		this.limit = 0;
+
+		// ensure clear state with hasRemaining false (for correct setNextBuffer logic)
+		this.dataBuffer.position(this.dataBuffer.limit());
+		this.lengthBuffer.position(4);
+	}
+
+	@Override
+	public boolean hasData() {
+		// either data in current target buffer or intermediate buffers
+		return this.position > 0 || (this.lengthBuffer.hasRemaining() || this.dataBuffer.hasRemaining());
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/AbstractIDTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/AbstractIDTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/AbstractIDTest.java
new file mode 100644
index 0000000..65a7b19
--- /dev/null
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/AbstractIDTest.java
@@ -0,0 +1,62 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.nephele.util.CommonTestUtils;
+
+/**
+ * This class contains tests for the {@link eu.stratosphere.nephele.AbstractID} class.
+ * 
+ */
+public class AbstractIDTest {
+
+	/**
+	 * Tests the setID method of an abstract ID.
+	 */
+	@Test
+	public void testSetID() {
+
+		final ChannelID id1 = new ChannelID();
+		final ChannelID id2 = new ChannelID();
+		id1.setID(id2);
+
+		assertEquals(id1.hashCode(), id2.hashCode());
+		assertEquals(id1, id2);
+	}
+
+	/**
+	 * Tests the serialization/deserialization of an abstract ID.
+	 */
+	@Test
+	public void testSerialization() {
+
+		final ChannelID origID = new ChannelID();
+		try {
+			final ChannelID copyID = (ChannelID) CommonTestUtils.createCopy(origID);
+
+			assertEquals(origID.hashCode(), copyID.hashCode());
+			assertEquals(origID, copyID);
+
+		} catch (IOException e) {
+			e.printStackTrace();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/deployment/ChannelDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/deployment/ChannelDeploymentDescriptorTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/deployment/ChannelDeploymentDescriptorTest.java
index 7cf83d7..b3ff279 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/deployment/ChannelDeploymentDescriptorTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/deployment/ChannelDeploymentDescriptorTest.java
@@ -21,7 +21,7 @@ import java.io.IOException;
 
 import org.junit.Test;
 
-import eu.stratosphere.nephele.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.channels.ChannelID;
 import eu.stratosphere.nephele.util.ServerTestUtils;
 import eu.stratosphere.util.StringUtils;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/deployment/GateDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/deployment/GateDeploymentDescriptorTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/deployment/GateDeploymentDescriptorTest.java
index 66ccad8..bc7034f 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/deployment/GateDeploymentDescriptorTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/deployment/GateDeploymentDescriptorTest.java
@@ -23,9 +23,9 @@ import java.util.List;
 
 import org.junit.Test;
 
-import eu.stratosphere.nephele.io.GateID;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.ChannelType;
+import eu.stratosphere.runtime.io.gates.GateID;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.channels.ChannelType;
 import eu.stratosphere.nephele.util.ServerTestUtils;
 import eu.stratosphere.util.StringUtils;
 
@@ -42,7 +42,7 @@ public class GateDeploymentDescriptorTest {
 	public void testConstructorWithValidArguments() {
 
 		final GateID gateID = new GateID();
-		final ChannelType channelType = ChannelType.INMEMORY;
+		final ChannelType channelType = ChannelType.IN_MEMORY;
 		final List<ChannelDeploymentDescriptor> channels = new ArrayList<ChannelDeploymentDescriptor>(0);
 
 		final GateDeploymentDescriptor gdd = new GateDeploymentDescriptor(gateID, channelType, channels);
@@ -59,7 +59,7 @@ public class GateDeploymentDescriptorTest {
 	public void testConstructorWithInvalidArguments() {
 
 		final GateID gateID = new GateID();
-		final ChannelType channelType = ChannelType.INMEMORY;
+		final ChannelType channelType = ChannelType.IN_MEMORY;
 		final List<ChannelDeploymentDescriptor> channels = new ArrayList<ChannelDeploymentDescriptor>(0);
 
 		boolean firstExceptionCaught = false;
@@ -105,7 +105,7 @@ public class GateDeploymentDescriptorTest {
 	public void testSerialization() {
 
 		final GateID gateID = new GateID();
-		final ChannelType channelType = ChannelType.INMEMORY;
+		final ChannelType channelType = ChannelType.IN_MEMORY;
 		final List<ChannelDeploymentDescriptor> channels = new ArrayList<ChannelDeploymentDescriptor>(0);
 		final ChannelDeploymentDescriptor cdd = new ChannelDeploymentDescriptor(new ChannelID(), new ChannelID());
 		channels.add(cdd);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/deployment/TaskDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/deployment/TaskDeploymentDescriptorTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/deployment/TaskDeploymentDescriptorTest.java
index e2581f8..7000667 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/deployment/TaskDeploymentDescriptorTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/deployment/TaskDeploymentDescriptorTest.java
@@ -24,7 +24,7 @@ import org.junit.Test;
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
 import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
-import eu.stratosphere.nephele.io.library.FileLineReader;
+import eu.stratosphere.nephele.util.FileLineReader;
 import eu.stratosphere.nephele.jobgraph.JobID;
 import eu.stratosphere.nephele.template.AbstractInvokable;
 import eu.stratosphere.nephele.util.SerializableArrayList;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ExecutionGraphTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ExecutionGraphTest.java
index 25e538d..5ff5f1c 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ExecutionGraphTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ExecutionGraphTest.java
@@ -45,10 +45,10 @@ import eu.stratosphere.nephele.instance.InstanceRequestMap;
 import eu.stratosphere.nephele.instance.InstanceType;
 import eu.stratosphere.nephele.instance.InstanceTypeDescription;
 import eu.stratosphere.nephele.instance.InstanceTypeFactory;
-import eu.stratosphere.nephele.io.DistributionPattern;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-import eu.stratosphere.nephele.io.library.FileLineReader;
-import eu.stratosphere.nephele.io.library.FileLineWriter;
+import eu.stratosphere.nephele.jobgraph.DistributionPattern;
+import eu.stratosphere.runtime.io.channels.ChannelType;
+import eu.stratosphere.nephele.util.FileLineReader;
+import eu.stratosphere.nephele.util.FileLineWriter;
 import eu.stratosphere.nephele.jobgraph.JobFileInputVertex;
 import eu.stratosphere.nephele.jobgraph.JobFileOutputVertex;
 import eu.stratosphere.nephele.jobgraph.JobGraph;
@@ -448,7 +448,7 @@ public class ExecutionGraphTest {
 	 * input1 -> task1 -> output1
 	 * no subtasks defined
 	 * input1 is default, task1 is m1.large, output1 is m1.xlarge
-	 * all channels are INMEMORY
+	 * all channels are IN_MEMORY
 	 */
 	@Test
 	public void testConvertJobGraphToExecutionGraph2() {
@@ -478,8 +478,8 @@ public class ExecutionGraphTest {
 			o1.setFilePath(new Path(new File(ServerTestUtils.getRandomFilename()).toURI()));
 
 			// connect vertices
-			i1.connectTo(t1, ChannelType.INMEMORY);
-			t1.connectTo(o1, ChannelType.INMEMORY);
+			i1.connectTo(t1, ChannelType.IN_MEMORY);
+			t1.connectTo(o1, ChannelType.IN_MEMORY);
 
 			LibraryCacheManager.register(jobID, new String[0]);
 
@@ -865,11 +865,11 @@ public class ExecutionGraphTest {
 			o1.setVertexToShareInstancesWith(o2);
 
 			// connect vertices
-			i1.connectTo(t1, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
-			i2.connectTo(t2, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+			i1.connectTo(t1, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
+			i2.connectTo(t2, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 			t1.connectTo(t3, ChannelType.NETWORK);
 			t2.connectTo(t3, ChannelType.NETWORK);
-			t3.connectTo(t4, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
+			t3.connectTo(t4, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
 			t4.connectTo(o1, ChannelType.NETWORK);
 			t4.connectTo(o2, ChannelType.NETWORK);
 
@@ -973,11 +973,11 @@ public class ExecutionGraphTest {
 			output.setNumberOfSubtasks(degreeOfParallelism);
 
 			// connect vertices
-			input.connectTo(cross, ChannelType.INMEMORY, 0, 0,
+			input.connectTo(cross, ChannelType.IN_MEMORY, 0, 0,
 				DistributionPattern.POINTWISE);
 			input.connectTo(cross, ChannelType.NETWORK, 1, 1,
 				DistributionPattern.BIPARTITE);
-			cross.connectTo(output, ChannelType.INMEMORY, 0, 0,
+			cross.connectTo(output, ChannelType.IN_MEMORY, 0, 0,
 				DistributionPattern.POINTWISE);
 
 			LibraryCacheManager.register(jobID, new String[0]);
@@ -1113,13 +1113,13 @@ public class ExecutionGraphTest {
 			output1.setNumberOfSubtasks(degreeOfParallelism);
 
 			// connect vertices
-			input1.connectTo(forward1, ChannelType.INMEMORY,
+			input1.connectTo(forward1, ChannelType.IN_MEMORY,
 				DistributionPattern.POINTWISE);
-			forward1.connectTo(forward2, ChannelType.INMEMORY,
+			forward1.connectTo(forward2, ChannelType.IN_MEMORY,
 				DistributionPattern.POINTWISE);
 			forward2.connectTo(forward3, ChannelType.NETWORK,
 				DistributionPattern.POINTWISE);
-			forward3.connectTo(output1, ChannelType.INMEMORY);
+			forward3.connectTo(output1, ChannelType.IN_MEMORY);
 
 			// setup instance sharing
 			input1.setVertexToShareInstancesWith(forward1);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ForwardTask1Input1Output.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ForwardTask1Input1Output.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ForwardTask1Input1Output.java
index 21341fa..0a2f52b 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ForwardTask1Input1Output.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ForwardTask1Input1Output.java
@@ -14,8 +14,8 @@
 package eu.stratosphere.nephele.executiongraph;
 
 import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.io.RecordReader;
-import eu.stratosphere.nephele.io.RecordWriter;
+import eu.stratosphere.runtime.io.api.RecordReader;
+import eu.stratosphere.runtime.io.api.RecordWriter;
 import eu.stratosphere.nephele.template.AbstractTask;
 
 public class ForwardTask1Input1Output extends AbstractTask {
@@ -26,18 +26,21 @@ public class ForwardTask1Input1Output extends AbstractTask {
 
 	@Override
 	public void invoke() throws Exception {
+		this.output.initializeSerializers();
 
 		while (this.input.hasNext()) {
 
 			StringRecord s = input.next();
 			this.output.emit(s);
 		}
+
+		this.output.flush();
 	}
 
 	@Override
 	public void registerInputOutput() {
 		this.input = new RecordReader<StringRecord>(this, StringRecord.class);
-		this.output = new RecordWriter<StringRecord>(this, StringRecord.class);
+		this.output = new RecordWriter<StringRecord>(this);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ForwardTask1Input2Outputs.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ForwardTask1Input2Outputs.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ForwardTask1Input2Outputs.java
index 6582ea7..5a5c325 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ForwardTask1Input2Outputs.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ForwardTask1Input2Outputs.java
@@ -14,8 +14,8 @@
 package eu.stratosphere.nephele.executiongraph;
 
 import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.io.RecordReader;
-import eu.stratosphere.nephele.io.RecordWriter;
+import eu.stratosphere.runtime.io.api.RecordReader;
+import eu.stratosphere.runtime.io.api.RecordWriter;
 import eu.stratosphere.nephele.template.AbstractTask;
 
 public class ForwardTask1Input2Outputs extends AbstractTask {
@@ -29,18 +29,24 @@ public class ForwardTask1Input2Outputs extends AbstractTask {
 	@Override
 	public void invoke() throws Exception {
 
+		this.output1.initializeSerializers();
+		this.output2.initializeSerializers();
+
 		while (this.input.hasNext()) {
 
 			StringRecord s = input.next();
 			this.output1.emit(s);
 			this.output2.emit(s);
 		}
+
+		this.output1.flush();
+		this.output2.flush();
 	}
 
 	@Override
 	public void registerInputOutput() {
 		this.input = new RecordReader<StringRecord>(this, StringRecord.class);
-		this.output1 = new RecordWriter<StringRecord>(this, StringRecord.class);
-		this.output2 = new RecordWriter<StringRecord>(this, StringRecord.class);
+		this.output1 = new RecordWriter<StringRecord>(this);
+		this.output2 = new RecordWriter<StringRecord>(this);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ForwardTask2Inputs1Output.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ForwardTask2Inputs1Output.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ForwardTask2Inputs1Output.java
index 7e8af0b..c87d093 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ForwardTask2Inputs1Output.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/ForwardTask2Inputs1Output.java
@@ -14,8 +14,8 @@
 package eu.stratosphere.nephele.executiongraph;
 
 import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.io.RecordReader;
-import eu.stratosphere.nephele.io.RecordWriter;
+import eu.stratosphere.runtime.io.api.RecordReader;
+import eu.stratosphere.runtime.io.api.RecordWriter;
 import eu.stratosphere.nephele.template.AbstractTask;
 
 public class ForwardTask2Inputs1Output extends AbstractTask {
@@ -28,6 +28,7 @@ public class ForwardTask2Inputs1Output extends AbstractTask {
 
 	@Override
 	public void invoke() throws Exception {
+		this.output.initializeSerializers();
 
 		while (this.input1.hasNext()) {
 
@@ -44,12 +45,14 @@ public class ForwardTask2Inputs1Output extends AbstractTask {
 				e.printStackTrace();
 			}
 		}
+
+		this.output.flush();
 	}
 
 	@Override
 	public void registerInputOutput() {
 		this.input1 = new RecordReader<StringRecord>(this, StringRecord.class);
 		this.input2 = new RecordReader<StringRecord>(this, StringRecord.class);
-		this.output = new RecordWriter<StringRecord>(this, StringRecord.class);
+		this.output = new RecordWriter<StringRecord>(this);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/SelfCrossForwardTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/SelfCrossForwardTask.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/SelfCrossForwardTask.java
index c375fd4..05f181c 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/SelfCrossForwardTask.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/SelfCrossForwardTask.java
@@ -15,8 +15,8 @@ package eu.stratosphere.nephele.executiongraph;
 
 
 import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.io.RecordReader;
-import eu.stratosphere.nephele.io.RecordWriter;
+import eu.stratosphere.runtime.io.api.RecordReader;
+import eu.stratosphere.runtime.io.api.RecordWriter;
 import eu.stratosphere.nephele.template.AbstractTask;
 
 /**
@@ -31,7 +31,7 @@ public class SelfCrossForwardTask extends AbstractTask {
 		
 		new RecordReader<StringRecord>(this, StringRecord.class);
 		new RecordReader<StringRecord>(this, StringRecord.class);
-		new RecordWriter<StringRecord>(this, StringRecord.class);
+		new RecordWriter<StringRecord>(this);
 	}
 
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/SelfCrossInputTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/SelfCrossInputTask.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/SelfCrossInputTask.java
index a2af3a9..1ce23e6 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/SelfCrossInputTask.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/executiongraph/SelfCrossInputTask.java
@@ -14,7 +14,7 @@
 package eu.stratosphere.nephele.executiongraph;
 
 import eu.stratosphere.core.io.StringRecord;
-import eu.stratosphere.nephele.io.RecordWriter;
+import eu.stratosphere.runtime.io.api.RecordWriter;
 import eu.stratosphere.nephele.template.AbstractFileInputTask;
 
 /**
@@ -27,8 +27,8 @@ public class SelfCrossInputTask extends AbstractFileInputTask {
 	@Override
 	public void registerInputOutput() {
 
-		new RecordWriter<StringRecord>(this, StringRecord.class);
-		new RecordWriter<StringRecord>(this, StringRecord.class);
+		new RecordWriter<StringRecord>(this);
+		new RecordWriter<StringRecord>(this);
 	}
 
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/fs/LineReaderTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/fs/LineReaderTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/fs/LineReaderTest.java
deleted file mode 100644
index 00dd645..0000000
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/fs/LineReaderTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.fs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.PrintWriter;
-
-import org.junit.Test;
-
-import eu.stratosphere.core.fs.FSDataInputStream;
-import eu.stratosphere.core.fs.Path;
-import eu.stratosphere.core.fs.local.LocalFileSystem;
-import eu.stratosphere.nephele.util.CommonTestUtils;
-import eu.stratosphere.runtime.fs.LineReader;
-
-/**
- * This class tests the functionality of the LineReader class using a local filesystem.
- * 
- */
-
-public class LineReaderTest {
-
-	/**
-	 * This test tests the LineReader. So far only under usual conditions.
-	 */
-	@Test
-	public void testLineReader() {
-		final File testfile = new File(CommonTestUtils.getTempDir() + File.separator
-			+ CommonTestUtils.getRandomFilename());
-		final Path pathtotestfile = new Path(testfile.toURI().getPath());
-
-		try {
-			PrintWriter pw = new PrintWriter(testfile, "UTF8");
-
-			for (int i = 0; i < 100; i++) {
-				pw.append("line\n");
-			}
-			pw.close();
-
-			LocalFileSystem lfs = new LocalFileSystem();
-			FSDataInputStream fis = lfs.open(pathtotestfile);
-
-			// first, we test under "usual" conditions
-			final LineReader lr = new LineReader(fis, 0, testfile.length(), 256);
-
-			byte[] buffer;
-			int linecount = 0;
-			while ((buffer = lr.readLine()) != null) {
-				assertEquals(new String(buffer, "UTF8"), "line");
-				linecount++;
-			}
-			assertEquals(linecount, 100);
-
-			// the linereader can not handle situations with larger length than the total file...
-
-		} catch (Exception e) {
-			fail(e.toString());
-			e.printStackTrace();
-		} finally {
-			testfile.delete();
-		}
-
-	}
-
-}


Mime
View raw message