flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [13/18] flink git commit: [FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for intermediate results
Date Mon, 12 Jan 2015 08:16:21 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/Reader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/Reader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/Reader.java
new file mode 100644
index 0000000..7255390
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/Reader.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.reader;
+
+import java.io.IOException;
+
+import org.apache.flink.core.io.IOReadableWritable;
+
+/**
+ * A record-oriented reader for immutable record types.
+ */
+public interface Reader<T extends IOReadableWritable> extends ReaderBase {
+
+	boolean hasNext() throws IOException, InterruptedException;
+
+	T next() throws IOException, InterruptedException;
+
+	void clearBuffers();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
new file mode 100644
index 0000000..bb6ec44
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.reader;
+
+import java.io.IOException;
+
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.util.event.EventListener;
+
+/**
+ * The basic API every reader (both buffer- and record-oriented) has to support.
+ */
+public interface ReaderBase {
+
+	// ------------------------------------------------------------------------
+	// Properties
+	// ------------------------------------------------------------------------
+
+	boolean isFinished();
+
+	// ------------------------------------------------------------------------
+	// Events
+	// ------------------------------------------------------------------------
+
+	void subscribeToTaskEvent(EventListener<TaskEvent> eventListener, Class<? extends TaskEvent> eventType);
+
+	void sendTaskEvent(TaskEvent event) throws IOException, InterruptedException;
+
+	// ------------------------------------------------------------------------
+	// Iterations
+	// ------------------------------------------------------------------------
+
+	void setIterativeReader();
+
+	void startNextSuperstep();
+
+	boolean hasReachedEndOfSuperstep();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java
new file mode 100644
index 0000000..db992a5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.reader;
+
+import org.apache.flink.core.io.IOReadableWritable;
+
+import java.io.IOException;
+
+public class RecordReader<T extends IOReadableWritable> extends AbstractRecordReader<T> implements Reader<T> {
+
+	private final Class<T> recordType;
+
+	private T currentRecord;
+
+	public RecordReader(BufferReaderBase reader, Class<T> recordType) {
+		super(reader);
+
+		this.recordType = recordType;
+	}
+
+	@Override
+	public boolean hasNext() throws IOException, InterruptedException {
+		if (currentRecord != null) {
+			return true;
+		}
+		else {
+			T record = instantiateRecordType();
+			if (getNextRecord(record)) {
+				currentRecord = record;
+				return true;
+			}
+			else {
+				return false;
+			}
+		}
+	}
+
+	@Override
+	public T next() throws IOException, InterruptedException {
+		if (hasNext()) {
+			T tmp = currentRecord;
+			currentRecord = null;
+			return tmp;
+		}
+		else {
+			return null;
+		}
+	}
+
+	@Override
+	public void clearBuffers() {
+		super.clearBuffers();
+	}
+
+	private T instantiateRecordType() {
+		try {
+			return recordType.newInstance();
+		}
+		catch (InstantiationException e) {
+			throw new RuntimeException("Cannot instantiate class " + recordType.getName(), e);
+		}
+		catch (IllegalAccessException e) {
+			throw new RuntimeException("Cannot instantiate class " + recordType.getName(), e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java
new file mode 100644
index 0000000..b5cec0b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/UnionBufferReader.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.reader;
+
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.util.event.EventListener;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * A buffer-oriented reader, which unions multiple {@link BufferReader}
+ * instances.
+ */
+public class UnionBufferReader implements BufferReaderBase, EventListener<BufferReader> {
+
+	private final BufferReader[] readers;
+
+	private final BlockingQueue<BufferReader> readersWithData = new LinkedBlockingQueue<BufferReader>();
+
+	// Set of readers, which are not closed yet
+	private final Set<BufferReader> remainingReaders;
+
+	// Logical channel index offset for each reader
+	private final Map<BufferReader, Integer> readerToIndexOffsetMap = new HashMap<BufferReader, Integer>();
+
+	private int totalNumInputChannels;
+
+	private BufferReader currentReader;
+
+	private int currentReaderChannelIndexOffset;
+
+	private int channelIndexOfLastReadBuffer = -1;
+
+	private boolean isIterative;
+
+	private boolean hasRequestedPartitions;
+
+	private boolean isTaskEvent;
+
+	public UnionBufferReader(BufferReader... readers) {
+		checkNotNull(readers);
+		checkArgument(readers.length >= 2, "Union buffer reader must be initialized with at least two individual buffer readers");
+
+		this.readers = readers;
+		this.remainingReaders = new HashSet<BufferReader>(readers.length + 1, 1.0F);
+
+		int currentChannelIndexOffset = 0;
+
+		for (int i = 0; i < readers.length; i++) {
+			BufferReader reader = readers[i];
+
+			reader.subscribeToReader(this);
+
+			remainingReaders.add(reader);
+			readerToIndexOffsetMap.put(reader, currentChannelIndexOffset);
+
+			totalNumInputChannels += reader.getNumberOfInputChannels();
+			currentChannelIndexOffset += reader.getNumberOfInputChannels();
+		}
+	}
+
+	@Override
+	public Buffer getNextBuffer() throws IOException, InterruptedException {
+		if (!hasRequestedPartitions) {
+			for (BufferReader reader : readers) {
+				reader.requestPartitionsOnce();
+			}
+
+			hasRequestedPartitions = true;
+		}
+
+		do {
+			if (currentReader == null) {
+				// Finished when all readers are finished
+				if (isFinished()) {
+					readersWithData.clear();
+					return null;
+				}
+				// Finished with superstep when all readers finished superstep
+				else if (isIterative && remainingReaders.isEmpty()) {
+					resetRemainingReaders();
+					return null;
+				}
+				else {
+					while (true) {
+						currentReader = readersWithData.take();
+						currentReaderChannelIndexOffset = readerToIndexOffsetMap.get(currentReader);
+
+						if (isIterative && !remainingReaders.contains(currentReader)) {
+							// If the current reader already received its end
+							// of superstep event and notified the union reader
+							// about newer data *before* all other readers have
+							// done so, we delay this notifications.
+							readersWithData.add(currentReader);
+						}
+						else {
+							break;
+						}
+					}
+				}
+			}
+
+			Buffer buffer = currentReader.getNextBuffer();
+			channelIndexOfLastReadBuffer = currentReaderChannelIndexOffset + currentReader.getChannelIndexOfLastBuffer();
+
+			isTaskEvent = false;
+
+			if (buffer == null) {
+				if (currentReader.isFinished() || currentReader.hasReachedEndOfSuperstep()) {
+					remainingReaders.remove(currentReader);
+				}
+
+				currentReader = null;
+
+				return null;
+			}
+			else {
+				currentReader = null;
+				return buffer;
+			}
+		} while (true);
+	}
+
+	@Override
+	public Buffer getNextBuffer(Buffer exchangeBuffer) throws IOException, InterruptedException {
+		throw new UnsupportedOperationException("Buffer exchange when reading data is not yet supported.");
+	}
+
+	@Override
+	public int getChannelIndexOfLastBuffer() {
+		return channelIndexOfLastReadBuffer;
+	}
+
+	@Override
+	public int getNumberOfInputChannels() {
+		return totalNumInputChannels;
+	}
+
+	@Override
+	public boolean isTaskEvent() {
+		return isTaskEvent;
+	}
+
+	@Override
+	public boolean isFinished() {
+		for (BufferReader reader : readers) {
+			if (!reader.isFinished()) {
+				return false;
+			}
+		}
+
+		return true;
+	}
+
+	private void resetRemainingReaders() {
+		checkState(isIterative, "Tried to reset remaining reader with non-iterative reader.");
+		checkState(remainingReaders.isEmpty(), "Tried to reset remaining readers, but there are some remaining readers.");
+		for (BufferReader reader : readers) {
+			remainingReaders.add(reader);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	// Notifications about available data
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void onEvent(BufferReader readerWithData) {
+		readersWithData.add(readerWithData);
+	}
+
+	// ------------------------------------------------------------------------
+	// TaskEvents
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void subscribeToTaskEvent(EventListener<TaskEvent> eventListener, Class<? extends TaskEvent> eventType) {
+		for (BufferReader reader : readers) {
+			reader.subscribeToTaskEvent(eventListener, eventType);
+		}
+	}
+
+	@Override
+	public void sendTaskEvent(TaskEvent event) throws IOException, InterruptedException {
+		for (BufferReader reader : readers) {
+			reader.sendTaskEvent(event);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	// Iteration end of superstep events
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void setIterativeReader() {
+		isIterative = true;
+
+		for (BufferReader reader : readers) {
+			reader.setIterativeReader();
+		}
+	}
+
+	@Override
+	public void startNextSuperstep() {
+		for (BufferReader reader : readers) {
+			reader.startNextSuperstep();
+		}
+	}
+
+	@Override
+	public boolean hasReachedEndOfSuperstep() {
+		for (BufferReader reader : readers) {
+			if (!reader.hasReachedEndOfSuperstep()) {
+				return false;
+			}
+		}
+
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
new file mode 100644
index 0000000..9eda286
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
@@ -0,0 +1,609 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.serialization;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * @param <T> The type of the record to be deserialized.
+ */
+/**
+ * @param <T> The type of the record to be deserialized.
+ */
+public class AdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> implements RecordDeserializer<T> {
+
+	private final NonSpanningWrapper nonSpanningWrapper;
+
+	private final SpanningWrapper spanningWrapper;
+
+	private Buffer currentBuffer;
+
+	public AdaptiveSpanningRecordDeserializer() {
+		this.nonSpanningWrapper = new NonSpanningWrapper();
+		this.spanningWrapper = new SpanningWrapper();
+	}
+
+	@Override
+	public void setNextBuffer(Buffer buffer) throws IOException {
+		currentBuffer = buffer;
+
+		MemorySegment segment = buffer.getMemorySegment();
+		int numBytes = buffer.getSize();
+
+		setNextMemorySegment(segment, numBytes);
+	}
+
+	@Override
+	public Buffer getCurrentBuffer () {
+		Buffer tmp = currentBuffer;
+		currentBuffer = null;
+		return tmp;
+	}
+
+	@Override
+	public void setNextMemorySegment(MemorySegment segment, int numBytes) throws IOException {
+		// check if some spanning record deserialization is pending
+		if (this.spanningWrapper.getNumGatheredBytes() > 0) {
+			this.spanningWrapper.addNextChunkFromMemorySegment(segment, numBytes);
+		}
+		else {
+			this.nonSpanningWrapper.initializeFromMemorySegment(segment, 0, numBytes);
+		}
+	}
+
+	@Override
+	public DeserializationResult getNextRecord(T target) throws IOException {
+		// always check the non-spanning wrapper first.
+		// this should be the majority of the cases for small records
+		// for large records, this portion of the work is very small in comparison anyways
+
+		int nonSpanningRemaining = this.nonSpanningWrapper.remaining();
+
+		// check if we can get a full length;
+		if (nonSpanningRemaining >= 4) {
+			int len = this.nonSpanningWrapper.readInt();
+
+			if (len <= nonSpanningRemaining - 4) {
+				// we can get a full record from here
+				target.read(this.nonSpanningWrapper);
+
+				return (this.nonSpanningWrapper.remaining() == 0) ?
+						DeserializationResult.LAST_RECORD_FROM_BUFFER :
+						DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
+			} else {
+				// we got the length, but we need the rest from the spanning deserializer
+				// and need to wait for more buffers
+				this.spanningWrapper.initializeWithPartialRecord(this.nonSpanningWrapper, len);
+				this.nonSpanningWrapper.clear();
+				return DeserializationResult.PARTIAL_RECORD;
+			}
+		} else if (nonSpanningRemaining > 0) {
+			// we have an incomplete length
+			// add our part of the length to the length buffer
+			this.spanningWrapper.initializeWithPartialLength(this.nonSpanningWrapper);
+			this.nonSpanningWrapper.clear();
+			return DeserializationResult.PARTIAL_RECORD;
+		}
+
+		// spanning record case
+		if (this.spanningWrapper.hasFullRecord()) {
+			// get the full record
+			target.read(this.spanningWrapper);
+
+			// move the remainder to the non-spanning wrapper
+			// this does not copy it, only sets the memory segment
+			this.spanningWrapper.moveRemainderToNonSpanningDeserializer(this.nonSpanningWrapper);
+			this.spanningWrapper.clear();
+
+			return (this.nonSpanningWrapper.remaining() == 0) ?
+					DeserializationResult.LAST_RECORD_FROM_BUFFER :
+					DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
+//		} else if (this.spanningWrapper.getNumGatheredBytes() == 0) {
+//			// error case. we are in the spanning deserializer, but it has no bytes, yet
+//			throw new IllegalStateException();
+		} else {
+			return DeserializationResult.PARTIAL_RECORD;
+		}
+	}
+
+	@Override
+	public void clear() {
+		this.nonSpanningWrapper.clear();
+		this.spanningWrapper.clear();
+	}
+
+	@Override
+	public boolean hasUnfinishedData() {
+		return this.nonSpanningWrapper.remaining() > 0 || this.spanningWrapper.getNumGatheredBytes() > 0;
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	private static final class NonSpanningWrapper implements DataInputView {
+
+		private MemorySegment segment;
+
+		private int limit;
+
+		private int position;
+
+		private byte[] utfByteBuffer; // reusable byte buffer for utf-8 decoding
+		private char[] utfCharBuffer; // reusable char buffer for utf-8 decoding
+
+		int remaining() {
+			return this.limit - this.position;
+		}
+
+		void clear() {
+			this.segment = null;
+			this.limit = 0;
+			this.position = 0;
+		}
+
+		void initializeFromMemorySegment(MemorySegment seg, int position, int leftOverLimit) {
+			this.segment = seg;
+			this.position = position;
+			this.limit = leftOverLimit;
+		}
+
+		// -------------------------------------------------------------------------------------------------------------
+		//                                       DataInput specific methods
+		// -------------------------------------------------------------------------------------------------------------
+
+		@Override
+		public final void readFully(byte[] b) throws IOException {
+			readFully(b, 0, b.length);
+		}
+
+		@Override
+		public final void readFully(byte[] b, int off, int len) throws IOException {
+			if (off < 0 || len < 0 || off + len > b.length) {
+				throw new IndexOutOfBoundsException();
+			}
+
+			this.segment.get(this.position, b, off, len);
+			this.position += len;
+		}
+
+		@Override
+		public final boolean readBoolean() throws IOException {
+			return readByte() == 1;
+		}
+
+		@Override
+		public final byte readByte() throws IOException {
+			return this.segment.get(this.position++);
+		}
+
+		@Override
+		public final int readUnsignedByte() throws IOException {
+			return readByte() & 0xff;
+		}
+
+		@Override
+		public final short readShort() throws IOException {
+			final short v = this.segment.getShort(this.position);
+			this.position += 2;
+			return v;
+		}
+
+		@Override
+		public final int readUnsignedShort() throws IOException {
+			final int v = this.segment.getShort(this.position) & 0xffff;
+			this.position += 2;
+			return v;
+		}
+
+		@Override
+		public final char readChar() throws IOException  {
+			final char v = this.segment.getChar(this.position);
+			this.position += 2;
+			return v;
+		}
+
+		@Override
+		public final int readInt() throws IOException {
+			final int v = this.segment.getIntBigEndian(this.position);
+			this.position += 4;
+			return v;
+		}
+
+		@Override
+		public final long readLong() throws IOException {
+			final long v = this.segment.getLongBigEndian(this.position);
+			this.position += 8;
+			return v;
+		}
+
+		@Override
+		public final float readFloat() throws IOException {
+			return Float.intBitsToFloat(readInt());
+		}
+
+		@Override
+		public final double readDouble() throws IOException {
+			return Double.longBitsToDouble(readLong());
+		}
+
+		@Override
+		public final String readLine() throws IOException {
+			final StringBuilder bld = new StringBuilder(32);
+
+			try {
+				int b;
+				while ((b = readUnsignedByte()) != '\n') {
+					if (b != '\r') {
+						bld.append((char) b);
+					}
+				}
+			}
+			catch (EOFException eofex) {}
+
+			if (bld.length() == 0) {
+				return null;
+			}
+
+			// trim a trailing carriage return
+			int len = bld.length();
+			if (len > 0 && bld.charAt(len - 1) == '\r') {
+				bld.setLength(len - 1);
+			}
+			return bld.toString();
+		}
+
+		@Override
+		public final String readUTF() throws IOException {
+			final int utflen = readUnsignedShort();
+
+			final byte[] bytearr;
+			final char[] chararr;
+
+			if (this.utfByteBuffer == null || this.utfByteBuffer.length < utflen) {
+				bytearr = new byte[utflen];
+				this.utfByteBuffer = bytearr;
+			} else {
+				bytearr = this.utfByteBuffer;
+			}
+			if (this.utfCharBuffer == null || this.utfCharBuffer.length < utflen) {
+				chararr = new char[utflen];
+				this.utfCharBuffer = chararr;
+			} else {
+				chararr = this.utfCharBuffer;
+			}
+
+			int c, char2, char3;
+			int count = 0;
+			int chararr_count = 0;
+
+			readFully(bytearr, 0, utflen);
+
+			while (count < utflen) {
+				c = (int) bytearr[count] & 0xff;
+				if (c > 127) {
+					break;
+				}
+				count++;
+				chararr[chararr_count++] = (char) c;
+			}
+
+			while (count < utflen) {
+				c = (int) bytearr[count] & 0xff;
+				switch (c >> 4) {
+					case 0:
+					case 1:
+					case 2:
+					case 3:
+					case 4:
+					case 5:
+					case 6:
+					case 7:
+						count++;
+						chararr[chararr_count++] = (char) c;
+						break;
+					case 12:
+					case 13:
+						count += 2;
+						if (count > utflen) {
+							throw new UTFDataFormatException("malformed input: partial character at end");
+						}
+						char2 = (int) bytearr[count - 1];
+						if ((char2 & 0xC0) != 0x80) {
+							throw new UTFDataFormatException("malformed input around byte " + count);
+						}
+						chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
+						break;
+					case 14:
+						count += 3;
+						if (count > utflen) {
+							throw new UTFDataFormatException("malformed input: partial character at end");
+						}
+						char2 = (int) bytearr[count - 2];
+						char3 = (int) bytearr[count - 1];
+						if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
+							throw new UTFDataFormatException("malformed input around byte " + (count - 1));
+						}
+						chararr[chararr_count++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
+						break;
+					default:
+						throw new UTFDataFormatException("malformed input around byte " + count);
+				}
+			}
+			// The number of chars produced may be less than utflen
+			return new String(chararr, 0, chararr_count);
+		}
+
+		@Override
+		public final int skipBytes(int n) throws IOException {
+			if (n < 0) {
+				throw new IllegalArgumentException();
+			}
+
+			int toSkip = Math.min(n, remaining());
+			this.position += toSkip;
+			return toSkip;
+		}
+
+		@Override
+		public void skipBytesToRead(int numBytes) throws IOException {
+			int skippedBytes = skipBytes(numBytes);
+
+			if(skippedBytes < numBytes){
+				throw new EOFException("Could not skip " + numBytes + " bytes.");
+			}
+		}
+
+		@Override
+		public int read(byte[] b, int off, int len) throws IOException {
+			if(b == null){
+				throw new NullPointerException("Byte array b cannot be null.");
+			}
+
+			if(off < 0){
+				throw new IllegalArgumentException("The offset off cannot be negative.");
+			}
+
+			if(len < 0){
+				throw new IllegalArgumentException("The length len cannot be negative.");
+			}
+
+			int toRead = Math.min(len, remaining());
+			this.segment.get(this.position,b,off, toRead);
+			this.position += toRead;
+
+			return toRead;
+		}
+
+		@Override
+		public int read(byte[] b) throws IOException {
+			return read(b, 0, b.length);
+		}
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	private static final class SpanningWrapper implements DataInputView {
+
+		private final DataOutputSerializer serializationBuffer;
+
+		private final DataInputDeserializer serializationReadBuffer;
+
+		private final ByteBuffer lengthBuffer;
+
+		private int recordLength;
+
+		private MemorySegment leftOverData;
+
+		private int leftOverStart;
+
+		private int leftOverLimit;
+
+		private int recordLimit;
+
+		public SpanningWrapper() {
+			this.lengthBuffer = ByteBuffer.allocate(4);
+			this.lengthBuffer.order(ByteOrder.BIG_ENDIAN);
+
+			this.recordLength = -1;
+
+			this.serializationBuffer = new DataOutputSerializer(1024);
+			this.serializationReadBuffer = new DataInputDeserializer();
+		}
+
+		private void initializeWithPartialRecord(NonSpanningWrapper partial, int nextRecordLength) throws IOException {
+			// set the length and copy what is available to the buffer
+			this.recordLength = nextRecordLength;
+			this.recordLimit = partial.remaining();
+			partial.segment.get(this.serializationBuffer, partial.position, partial.remaining());
+			this.serializationReadBuffer.setBuffer(this.serializationBuffer.wrapAsByteBuffer());
+		}
+
+		private void initializeWithPartialLength(NonSpanningWrapper partial) throws IOException {
+			// copy what we have to the length buffer
+			partial.segment.get(partial.position, this.lengthBuffer, partial.remaining());
+		}
+
+		private void addNextChunkFromMemorySegment(MemorySegment segment, int numBytesInSegment) throws IOException {
+			int segmentPosition = 0;
+
+			// check where to go. if we have a partial length, we need to complete it first
+			if (this.lengthBuffer.position() > 0) {
+				int toPut = Math.min(this.lengthBuffer.remaining(), numBytesInSegment);
+				segment.get(0, this.lengthBuffer, toPut);
+
+				// did we complete the length?
+				if (this.lengthBuffer.hasRemaining()) {
+					return;
+				} else {
+					this.recordLength = this.lengthBuffer.getInt(0);
+
+					this.lengthBuffer.clear();
+					segmentPosition = toPut;
+				}
+			}
+
+			// copy as much as we need or can for this next spanning record
+			int needed = this.recordLength - this.recordLimit;
+			int available = numBytesInSegment - segmentPosition;
+			int toCopy = Math.min(needed, available);
+
+			segment.get(this.serializationBuffer, segmentPosition, toCopy);
+			this.recordLimit += toCopy;
+
+			if (toCopy < available) {
+				// there is more data in the segment
+				this.leftOverData = segment;
+				this.leftOverStart = segmentPosition + toCopy;
+				this.leftOverLimit = numBytesInSegment;
+			}
+
+			// update read view
+			this.serializationReadBuffer.setBuffer(this.serializationBuffer.wrapAsByteBuffer());
+		}
+
+		private void moveRemainderToNonSpanningDeserializer(NonSpanningWrapper deserializer) {
+			deserializer.clear();
+
+			if (leftOverData != null) {
+				deserializer.initializeFromMemorySegment(leftOverData, leftOverStart, leftOverLimit);
+			}
+		}
+
+		private boolean hasFullRecord() {
+			return this.recordLength >= 0 && this.recordLimit >= this.recordLength;
+		}
+
+		private int getNumGatheredBytes() {
+			return this.recordLimit + (this.recordLength >= 0 ? 4 : lengthBuffer.position()) + this.serializationBuffer.length();
+		}
+
+		public void clear() {
+			this.serializationBuffer.clear();
+
+			this.recordLength = -1;
+			this.lengthBuffer.clear();
+			this.leftOverData = null;
+			this.recordLimit = 0;
+		}
+
+		// -------------------------------------------------------------------------------------------------------------
+		//                                       DataInput specific methods
+		// -------------------------------------------------------------------------------------------------------------
+
+		@Override
+		public void readFully(byte[] b) throws IOException {
+			this.serializationReadBuffer.readFully(b);
+		}
+
+		@Override
+		public void readFully(byte[] b, int off, int len) throws IOException {
+			this.serializationReadBuffer.readFully(b, off, len);
+		}
+
+		@Override
+		public int skipBytes(int n) throws IOException {
+			return this.serializationReadBuffer.skipBytes(n);
+		}
+
+		@Override
+		public boolean readBoolean() throws IOException {
+			return this.serializationReadBuffer.readBoolean();
+		}
+
+		@Override
+		public byte readByte() throws IOException {
+			return this.serializationReadBuffer.readByte();
+		}
+
+		@Override
+		public int readUnsignedByte() throws IOException {
+			return this.serializationReadBuffer.readUnsignedByte();
+		}
+
+		@Override
+		public short readShort() throws IOException {
+			return this.serializationReadBuffer.readShort();
+		}
+
+		@Override
+		public int readUnsignedShort() throws IOException {
+			return this.serializationReadBuffer.readUnsignedShort();
+		}
+
+		@Override
+		public char readChar() throws IOException {
+			return this.serializationReadBuffer.readChar();
+		}
+
+		@Override
+		public int readInt() throws IOException {
+			return this.serializationReadBuffer.readInt();
+		}
+
+		@Override
+		public long readLong() throws IOException {
+			return this.serializationReadBuffer.readLong();
+		}
+
+		@Override
+		public float readFloat() throws IOException {
+			return this.serializationReadBuffer.readFloat();
+		}
+
+		@Override
+		public double readDouble() throws IOException {
+			return this.serializationReadBuffer.readDouble();
+		}
+
+		@Override
+		public String readLine() throws IOException {
+			return this.serializationReadBuffer.readLine();
+		}
+
+		@Override
+		public String readUTF() throws IOException {
+			return this.serializationReadBuffer.readUTF();
+		}
+
+		@Override
+		public void skipBytesToRead(int numBytes) throws IOException {
+			this.serializationReadBuffer.skipBytesToRead(numBytes);
+		}
+
+		@Override
+		public int read(byte[] b, int off, int len) throws IOException {
+			return this.serializationReadBuffer.read(b, off, len);
+		}
+
+		@Override
+		public int read(byte[] b) throws IOException {
+			return this.serializationReadBuffer.read(b);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
new file mode 100644
index 0000000..76c88c1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.serialization;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class EventSerializer {
+
+	public final static BufferRecycler RECYCLER = new BufferRecycler() {
+		@Override
+		public void recycle(MemorySegment memorySegment) {
+			memorySegment.free();
+		}
+	};
+
+	public static ByteBuffer toSerializedEvent(AbstractEvent event) {
+		try {
+			final DataOutputSerializer serializer = new DataOutputSerializer(128);
+
+			serializer.writeUTF(event.getClass().getName());
+			event.write(serializer);
+
+			return serializer.wrapAsByteBuffer();
+		}
+		catch (IOException e) {
+			throw new RuntimeException("Error while serializing event.", e);
+		}
+	}
+
+	public static AbstractEvent fromSerializedEvent(ByteBuffer buffer, ClassLoader classLoader) {
+		try {
+			final DataInputDeserializer deserializer = new DataInputDeserializer(buffer);
+
+			final String className = deserializer.readUTF();
+
+			final Class<? extends AbstractEvent> clazz;
+			try {
+				clazz = classLoader.loadClass(className).asSubclass(AbstractEvent.class);
+			}
+			catch (ClassNotFoundException e) {
+				throw new RuntimeException("Could not load event class '" + className + "'.", e);
+			}
+			catch (ClassCastException e) {
+				throw new RuntimeException("The class '" + className + "' is not a valid subclass of '" + AbstractEvent.class.getName() + "'.", e);
+			}
+
+			final AbstractEvent event = InstantiationUtil.instantiate(clazz, AbstractEvent.class);
+			event.read(deserializer);
+
+			return event;
+		}
+		catch (IOException e) {
+			throw new RuntimeException("Error while deserializing event.", e);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	// Buffer helpers
+	// ------------------------------------------------------------------------
+
+	public static Buffer toBuffer(AbstractEvent event) {
+		final ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(event);
+
+		final Buffer buffer = new Buffer(new MemorySegment(serializedEvent.array()), RECYCLER, false);
+		buffer.setSize(serializedEvent.remaining());
+
+		return buffer;
+	}
+
+	public static AbstractEvent fromBuffer(Buffer buffer, ClassLoader classLoader) {
+		return fromSerializedEvent(buffer.getNioBuffer(), classLoader);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
new file mode 100644
index 0000000..dd8ea06
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.io.network.api.serialization;
+
+import java.io.IOException;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+/**
+ * Interface for turning sequences of memory segments into records.
+ */
+public interface RecordDeserializer<T extends IOReadableWritable> {
+
+	public static enum DeserializationResult {
+		PARTIAL_RECORD(false, true),
+		INTERMEDIATE_RECORD_FROM_BUFFER(true, false),
+		LAST_RECORD_FROM_BUFFER(true, true);
+
+		private final boolean isFullRecord;
+
+		private final boolean isBufferConsumed;
+
+		private DeserializationResult(boolean isFullRecord, boolean isBufferConsumed) {
+			this.isFullRecord = isFullRecord;
+			this.isBufferConsumed = isBufferConsumed;
+		}
+
+		public boolean isFullRecord () {
+			return this.isFullRecord;
+		}
+
+		public boolean isBufferConsumed() {
+			return this.isBufferConsumed;
+		}
+	}
+	
+	DeserializationResult getNextRecord(T target) throws IOException;
+
+	void setNextMemorySegment(MemorySegment segment, int numBytes) throws IOException;
+
+	void setNextBuffer(Buffer buffer) throws IOException;
+
+	Buffer getCurrentBuffer();
+
+	void clear();
+	
+	boolean hasUnfinishedData();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
new file mode 100644
index 0000000..6268218
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.io.network.api.serialization;
+
+import java.io.IOException;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+/**
+ * Interface for turning records into sequences of memory segments.
+ */
+public interface RecordSerializer<T extends IOReadableWritable> {
+
+	public static enum SerializationResult {
+		PARTIAL_RECORD_MEMORY_SEGMENT_FULL(false, true),
+		FULL_RECORD_MEMORY_SEGMENT_FULL(true, true),
+		FULL_RECORD(true, false);
+		
+		private final boolean isFullRecord;
+
+		private final boolean isFullBuffer;
+		
+		private SerializationResult(boolean isFullRecord, boolean isFullBuffer) {
+			this.isFullRecord = isFullRecord;
+			this.isFullBuffer = isFullBuffer;
+		}
+		
+		public boolean isFullRecord() {
+			return this.isFullRecord;
+		}
+		
+		public boolean isFullBuffer() {
+			return this.isFullBuffer;
+		}
+	}
+	
+	SerializationResult addRecord(T record) throws IOException;
+
+	SerializationResult setNextBuffer(Buffer buffer) throws IOException;
+
+	Buffer getCurrentBuffer();
+	
+	void clear();
+	
+	boolean hasData();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
new file mode 100644
index 0000000..9d0256f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.serialization;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+
+public class SpanningRecordSerializer<T extends IOReadableWritable> implements RecordSerializer<T> {
+
+	/** Flag to enable/disable checks, if buffer not set/full or pending serialization */
+	private static final boolean CHECKED = false;
+
+	/** Intermediate data serialization */
+	private final DataOutputSerializer serializationBuffer;
+
+	/** Intermediate buffer for data serialization */
+	private ByteBuffer dataBuffer;
+
+	/** Intermediate buffer for length serialization */
+	private final ByteBuffer lengthBuffer;
+
+	/** Current target {@link Buffer} of the serializer */
+	private Buffer targetBuffer;
+
+	/** Position in current {@link MemorySegment} of target buffer */
+	private int position;
+
+	/** Limit of current {@link MemorySegment} of target buffer */
+	private int limit;
+
+	public SpanningRecordSerializer() {
+		this.serializationBuffer = new DataOutputSerializer(128);
+
+		this.lengthBuffer = ByteBuffer.allocate(4);
+		this.lengthBuffer.order(ByteOrder.BIG_ENDIAN);
+
+		// ensure initial state with hasRemaining false (for correct setNextBuffer logic)
+		this.dataBuffer = this.serializationBuffer.wrapAsByteBuffer();
+		this.lengthBuffer.position(4);
+	}
+
+	@Override
+	public SerializationResult addRecord(T record) throws IOException {
+		if (CHECKED) {
+			if (this.dataBuffer.hasRemaining()) {
+				throw new IllegalStateException("Pending serialization of previous record.");
+			}
+		}
+
+		this.serializationBuffer.clear();
+		this.lengthBuffer.clear();
+
+		// write data and length
+		record.write(this.serializationBuffer);
+
+		this.lengthBuffer.putInt(0, this.serializationBuffer.length());
+
+		this.dataBuffer = this.serializationBuffer.wrapAsByteBuffer();
+
+		// Copy from intermediate buffers to current target memory segment
+		copyToTargetBufferFrom(this.lengthBuffer);
+		copyToTargetBufferFrom(this.dataBuffer);
+
+		return getSerializationResult();
+	}
+
+	@Override
+	public SerializationResult setNextBuffer(Buffer buffer) throws IOException {
+		this.targetBuffer = buffer;
+		this.position = 0;
+		this.limit = buffer.getSize();
+
+		if (this.lengthBuffer.hasRemaining()) {
+			copyToTargetBufferFrom(this.lengthBuffer);
+		}
+
+		if (this.dataBuffer.hasRemaining()) {
+			copyToTargetBufferFrom(this.dataBuffer);
+		}
+
+		return getSerializationResult();
+	}
+
+	/**
+	 * Copies as many bytes as possible from the given {@link ByteBuffer} to the {@link MemorySegment} of the target
+	 * {@link Buffer} and advances the current position by the number of written bytes.
+	 *
+	 * @param source the {@link ByteBuffer} to copy data from
+	 */
+	private void copyToTargetBufferFrom(ByteBuffer source) {
+		if (this.targetBuffer == null) {
+			return;
+		}
+
+		int needed = source.remaining();
+		int available = this.limit - this.position;
+		int toCopy = Math.min(needed, available);
+
+		this.targetBuffer.getMemorySegment().put(this.position, source, toCopy);
+
+		this.position += toCopy;
+	}
+
+	private SerializationResult getSerializationResult() {
+		if (!this.dataBuffer.hasRemaining() && !this.lengthBuffer.hasRemaining()) {
+			return (this.position < this.limit)
+					? SerializationResult.FULL_RECORD
+					: SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL;
+		}
+
+		return SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL;
+	}
+
+	@Override
+	public Buffer getCurrentBuffer() {
+		if (targetBuffer == null) {
+			return null;
+		}
+
+		this.targetBuffer.setSize(this.position);
+		return this.targetBuffer;
+	}
+
+	@Override
+	public void clear() {
+		this.targetBuffer = null;
+		this.position = 0;
+		this.limit = 0;
+
+		// ensure clear state with hasRemaining false (for correct setNextBuffer logic)
+		this.dataBuffer.position(this.dataBuffer.limit());
+		this.lengthBuffer.position(4);
+	}
+
+	@Override
+	public boolean hasData() {
+		// either data in current target buffer or intermediate buffers
+		return this.position > 0 || (this.lengthBuffer.hasRemaining() || this.dataBuffer.hasRemaining());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BufferWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BufferWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BufferWriter.java
new file mode 100644
index 0000000..6cb1831
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BufferWriter.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.writer;
+
+import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.partition.IntermediateResultPartition;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.runtime.util.event.EventNotificationHandler;
+
+import java.io.IOException;
+
+/**
+ * A buffer-oriented runtime result writer.
+ * <p>
+ * The {@link BufferWriter} is the runtime API for producing results. It
+ * supports two kinds of data to be sent: buffers and events.
+ * <p>
+ * <strong>Important</strong>: When working directly with this API, it is
+ * necessary to call {@link #finish()} after all data has been produced.
+ */
+public final class BufferWriter implements EventListener<TaskEvent> {
+
+	private final IntermediateResultPartition partition;
+
+	private final EventNotificationHandler<TaskEvent> taskEventHandler = new EventNotificationHandler<TaskEvent>();
+
+	public BufferWriter(IntermediateResultPartition partition) {
+		this.partition = partition;
+	}
+
+	// ------------------------------------------------------------------------
+	// Attributes
+	// ------------------------------------------------------------------------
+
+	public IntermediateResultPartitionID getPartitionId() {
+		return partition.getPartitionId();
+	}
+
+	public BufferProvider getBufferProvider() {
+		return partition.getBufferProvider();
+	}
+
+	public int getNumberOfOutputChannels() {
+		return partition.getNumberOfQueues();
+	}
+
+	// ------------------------------------------------------------------------
+	// Data processing
+	// ------------------------------------------------------------------------
+
+	public void writeBuffer(Buffer buffer, int targetChannel) throws IOException {
+		partition.add(buffer, targetChannel);
+	}
+
+	public void writeEvent(AbstractEvent event, int targetChannel) throws IOException {
+		partition.add(EventSerializer.toBuffer(event), targetChannel);
+	}
+
+	public void writeEventToAllChannels(AbstractEvent event) throws IOException {
+		for (int i = 0; i < partition.getNumberOfQueues(); i++) {
+			Buffer buffer = EventSerializer.toBuffer(event);
+			partition.add(buffer, i);
+		}
+	}
+
+	public void writeEndOfSuperstep() throws IOException {
+		for (int i = 0; i < partition.getNumberOfQueues(); i++) {
+			Buffer buffer = EventSerializer.toBuffer(EndOfSuperstepEvent.INSTANCE);
+			partition.add(buffer, i);
+		}
+	}
+
+	public void finish() throws IOException, InterruptedException {
+		partition.finish();
+	}
+
+	public boolean isFinished() {
+		return partition.isFinished();
+	}
+
+	// ------------------------------------------------------------------------
+	// Event handling
+	// ------------------------------------------------------------------------
+
+	public EventNotificationHandler<TaskEvent> getTaskEventHandler() {
+		return taskEventHandler;
+	}
+
+	public void subscribeToEvent(EventListener<TaskEvent> eventListener, Class<? extends TaskEvent> eventType) {
+		taskEventHandler.subscribe(eventListener, eventType);
+	}
+
+	@Override
+	public void onEvent(TaskEvent event) {
+		taskEventHandler.publish(event);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelector.java
new file mode 100644
index 0000000..65012fe
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelector.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.writer;
+
+import org.apache.flink.core.io.IOReadableWritable;
+
+/**
+ * The {@link ChannelSelector} determines to which logical channels a record
+ * should be written to.
+ *
+ * @param <T> the type of record which is sent through the attached output gate
+ */
+public interface ChannelSelector<T extends IOReadableWritable> {
+
+	/**
+	 * Returns the logical channel indexes, to which the given record should be
+	 * written.
+	 *
+	 * @param record      the record to the determine the output channels for
+	 * @param numChannels the total number of output channels which are attached to respective output gate
+	 * @return a (possibly empty) array of integer numbers which indicate the indices of the output channels through
+	 * which the record shall be forwarded
+	 */
+	int[] selectChannels(T record, int numChannels);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
new file mode 100644
index 0000000..ae87aff
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.writer;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
+import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import java.io.IOException;
+
+import static org.apache.flink.runtime.io.network.api.serialization.RecordSerializer.SerializationResult;
+
+/**
+ * A record-oriented runtime result writer.
+ * <p>
+ * The RecordWriter wraps the runtime's {@link BufferWriter} and takes care of
+ * serializing records into buffers.
+ * <p>
+ * <strong>Important</strong>: it is necessary to call {@link #flush()} after
+ * all records have been written with {@link #emit(IOReadableWritable)}. This
+ * ensures that all produced records are written to the output stream (incl.
+ * partially filled ones).
+ *
+ * @param <T> the type of the record that can be emitted with this record writer
+ */
+public class RecordWriter<T extends IOReadableWritable> {
+
+	protected final BufferWriter writer;
+
+	private final ChannelSelector<T> channelSelector;
+
+	private final int numChannels;
+
+	/** {@link RecordSerializer} per outgoing channel */
+	private RecordSerializer<T>[] serializers;
+
+	public RecordWriter(BufferWriter writer) {
+		this(writer, new RoundRobinChannelSelector<T>());
+	}
+
+	public RecordWriter(BufferWriter writer, ChannelSelector<T> channelSelector) {
+		this.writer = writer;
+		this.channelSelector = channelSelector;
+
+		this.numChannels = writer.getNumberOfOutputChannels();
+
+		/**
+		 * The runtime exposes a channel abstraction for the produced results
+		 * (see {@link ChannelSelector}). Every channel has an independent
+		 * serializer.
+		 */
+		this.serializers = new SpanningRecordSerializer[numChannels];
+		for (int i = 0; i < numChannels; i++) {
+			serializers[i] = new SpanningRecordSerializer<T>();
+		}
+	}
+
+	public boolean isFinished() {
+		return writer.isFinished();
+	}
+
+	public void emit(T record) throws IOException, InterruptedException {
+		for (int targetChannel : channelSelector.selectChannels(record, numChannels)) {
+			// serialize with corresponding serializer and send full buffer
+			RecordSerializer<T> serializer = serializers[targetChannel];
+
+			synchronized (serializer) {
+				SerializationResult result = serializer.addRecord(record);
+				while (result.isFullBuffer()) {
+					Buffer buffer = serializer.getCurrentBuffer();
+
+					if (buffer != null) {
+						writer.writeBuffer(buffer, targetChannel);
+					}
+
+					buffer = writer.getBufferProvider().requestBufferBlocking();
+					result = serializer.setNextBuffer(buffer);
+				}
+			}
+		}
+	}
+
+	public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException {
+		for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) {
+			RecordSerializer<T> serializer = serializers[targetChannel];
+
+			synchronized (serializer) {
+				Buffer buffer = serializer.getCurrentBuffer();
+				if (buffer == null) {
+					writer.writeEvent(event, targetChannel);
+				}
+				else {
+					writer.writeBuffer(buffer, targetChannel);
+					writer.writeEvent(event, targetChannel);
+
+					buffer = writer.getBufferProvider().requestBufferBlocking();
+					serializer.setNextBuffer(buffer);
+				}
+			}
+		}
+	}
+
+	public void sendEndOfSuperstep() throws IOException, InterruptedException {
+		for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) {
+			RecordSerializer<T> serializer = serializers[targetChannel];
+
+			synchronized (serializer) {
+				Buffer buffer = serializer.getCurrentBuffer();
+				if (buffer != null) {
+
+					writer.writeBuffer(buffer, targetChannel);
+
+					buffer = writer.getBufferProvider().requestBufferBlocking();
+					serializer.setNextBuffer(buffer);
+				}
+			}
+		}
+
+		writer.writeEndOfSuperstep();
+	}
+
+	public void flush() throws IOException {
+		for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) {
+			RecordSerializer<T> serializer = serializers[targetChannel];
+
+			synchronized (serializer) {
+				Buffer buffer = serializer.getCurrentBuffer();
+				serializer.clear();
+
+				if (buffer != null) {
+					writer.writeBuffer(buffer, targetChannel);
+				}
+			}
+		}
+	}
+
+	public void clearBuffers() {
+		if (serializers != null) {
+			for (RecordSerializer<?> s : serializers) {
+				Buffer b = s.getCurrentBuffer();
+				if (b != null && !b.isRecycled()) {
+					b.recycle();
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java
new file mode 100644
index 0000000..46af5a7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.io.network.api.writer;
+
+import org.apache.flink.core.io.IOReadableWritable;
+
+/**
+ * This is the default implementation of the {@link ChannelSelector} interface. It represents a simple round-robin
+ * strategy, i.e. regardless of the record every attached exactly one output channel is selected at a time.
+
+ * @param <T>
+ *        the type of record which is sent through the attached output gate
+ */
+public class RoundRobinChannelSelector<T extends IOReadableWritable> implements ChannelSelector<T> {
+
+	/**
+	 * Stores the index of the channel to send the next record to.
+	 */
+	private final int[] nextChannelToSendTo = new int[1];
+
+	/**
+	 * Constructs a new default channel selector.
+	 */
+	public RoundRobinChannelSelector() {
+		this.nextChannelToSendTo[0] = 0;
+	}
+
+
+	@Override
+	public int[] selectChannels(final T record, final int numberOfOutputChannels) {
+
+		this.nextChannelToSendTo[0] = (this.nextChannelToSendTo[0] + 1) % numberOfOutputChannels;
+
+		return this.nextChannelToSendTo;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
new file mode 100644
index 0000000..24cd106
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.nio.ByteBuffer;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Wrapper for pooled {@link MemorySegment} instances.
+ */
+public class Buffer {
+
+	/** The backing {@link MemorySegment} instance */
+	private final MemorySegment memorySegment;
+
+	private final Object recycleLock = new Object();
+
+	/** The recycler for the backing {@link MemorySegment} */
+	private final BufferRecycler recycler;
+
+	private final boolean isBuffer;
+
+	/** The current number of references to this buffer */
+	private int referenceCount = 1;
+
+	/**
+	 * The current size of the buffer in the range from 0 (inclusive) to the
+	 * size of the backing {@link MemorySegment} (inclusive).
+	 */
+	private int currentSize;
+
+	public Buffer(MemorySegment memorySegment, BufferRecycler recycler) {
+		this(memorySegment, recycler, true);
+	}
+
+	public Buffer(MemorySegment memorySegment, BufferRecycler recycler, boolean isBuffer) {
+		this.memorySegment = checkNotNull(memorySegment);
+		this.recycler = checkNotNull(recycler);
+		this.isBuffer = isBuffer;
+
+		this.currentSize = memorySegment.size();
+	}
+
+	public boolean isBuffer() {
+		return isBuffer;
+	}
+
+	public MemorySegment getMemorySegment() {
+		synchronized (recycleLock) {
+			ensureNotRecycled();
+
+			return memorySegment;
+		}
+	}
+
+	public ByteBuffer getNioBuffer() {
+		synchronized (recycleLock) {
+			ensureNotRecycled();
+
+			return memorySegment.wrap(0, currentSize).duplicate();
+		}
+	}
+
+	public int getSize() {
+		synchronized (recycleLock) {
+			ensureNotRecycled();
+
+			return currentSize;
+		}
+	}
+
+	public void setSize(int newSize) {
+		synchronized (recycleLock) {
+			ensureNotRecycled();
+
+			checkArgument(newSize >= 0 && newSize <= memorySegment.size(), "Size of buffer must be >= 0 and <= " + memorySegment.size() + ", but was " + newSize + ".");
+
+			currentSize = newSize;
+		}
+	}
+
+	public void recycle() {
+		synchronized (recycleLock) {
+			ensureNotRecycled();
+
+			if (--referenceCount == 0) {
+				recycler.recycle(memorySegment);
+			}
+		}
+	}
+
+	public Buffer retain() {
+		synchronized (recycleLock) {
+			ensureNotRecycled();
+
+			referenceCount++;
+
+			return this;
+		}
+	}
+
+	public boolean isRecycled() {
+		synchronized (recycleLock) {
+			return referenceCount == 0;
+		}
+	}
+
+	// Must be called from synchronized scope
+	private void ensureNotRecycled() {
+		checkState(referenceCount > 0, "Buffer has already been recycled.");
+	}
+
+	@Override
+	public String toString() {
+		synchronized (recycleLock) {
+			return String.format("Buffer %s [size: %d, reference count: %d]", hashCode(), currentSize, referenceCount);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
new file mode 100644
index 0000000..15ac7a3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import java.io.IOException;
+
+public interface BufferPool extends BufferProvider, BufferRecycler {
+
+	void setBufferPoolOwner(BufferPoolOwner owner);
+
+	void destroy() throws IOException;
+
+	boolean isDestroyed();
+
+	int getNumberOfRequiredMemorySegments();
+
+	int getNumBuffers();
+
+	void setNumBuffers(int numBuffers) throws IOException;
+
+	int getNumberOfAvailableMemorySegments();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
new file mode 100644
index 0000000..fcd4d96
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import java.io.IOException;
+
+public interface BufferPoolFactory {
+
+	BufferPool createBufferPool(int numRequiredBuffers, boolean isFixedSize) throws IOException;
+
+	void destroyBufferPool(BufferPool bufferPool) throws IOException;
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolOwner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolOwner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolOwner.java
new file mode 100644
index 0000000..65c948a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolOwner.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import java.io.IOException;
+
+public interface BufferPoolOwner {
+
+	void recycleBuffers(int numBuffersToRecycle) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
new file mode 100644
index 0000000..d898845
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.runtime.util.event.EventListener;
+
+import java.io.IOException;
+
+public interface BufferProvider {
+
+	/**
+	 * Returns a {@link Buffer} instance from the buffer provider, if one is available.
+	 * <p>
+	 * Returns <code>null</code> if no buffer is available or the buffer provider has been destroyed.
+	 */
+	Buffer requestBuffer() throws IOException;
+
+	/**
+	 * Returns a {@link Buffer} instance from the buffer provider.
+	 * <p>
+	 * If there is no buffer available, the call will block until one becomes available again or the
+	 * buffer provider has been destroyed.
+	 */
+	Buffer requestBufferBlocking() throws IOException, InterruptedException;
+
+	/**
+	 * Adds a buffer availability listener to the buffer provider.
+	 * <p>
+	 * The operation fails with return value <code>false</code>, when there is a buffer available or
+	 * the buffer provider has been destroyed.
+	 * <p>
+	 * If the buffer provider gets destroyed while the listener is registered the listener will be
+	 * notified with a <code>null</code> value.
+	 */
+	boolean addListener(EventListener<Buffer> listener);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferRecycler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferRecycler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferRecycler.java
new file mode 100644
index 0000000..a6495d0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferRecycler.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+public interface BufferRecycler {
+
+	/**
+	 * Recycles the {@link MemorySegment} to its original {@link BufferPool}
+	 * instance.
+	 *
+	 * @param memorySegment The memory segment to be recycled.
+	 */
+	void recycle(MemorySegment memorySegment);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
new file mode 100644
index 0000000..9e4b5a1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.util.event.EventListener;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * A buffer pool used to manage a number of {@link Buffer} instances from the
+ * {@link NetworkBufferPool}.
+ * <p>
+ * Buffer requests are mediated to the network buffer pool to ensure dead-lock
+ * free operation of the network stack by limiting the number of buffers per
+ * local buffer pool. It also implements the default mechanism for buffer
+ * recycling, which ensures that every buffer is ultimately returned to the
+ * network buffer pool.
+ */
+class LocalBufferPool implements BufferPool {
+
+	private final NetworkBufferPool networkBufferPool;
+
+	// The minimum number of required segments for this pool
+	private final int numberOfRequiredMemorySegments;
+
+	// The currently available memory segments. These are segments, which have been requested from
+	// the network buffer pool and are currently not handed out as Buffer instances.
+	private final Queue<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>();
+
+	// Buffer availability listeners, which need to be notified when a Buffer becomes available.
+	// Listeners can only be registered at a time/state where no Buffer instance was available.
+	private final Queue<EventListener<Buffer>> registeredListeners = new ArrayDeque<EventListener<Buffer>>();
+
+	// The current size of this pool
+	private int currentPoolSize;
+
+	// Number of all memory segments, which have been requested from the network buffer pool and are
+	// somehow referenced through this pool (e.g. wrapped in Buffer instances or as available segments).
+	private int numberOfRequestedMemorySegments;
+
+	private boolean isDestroyed;
+
+	private BufferPoolOwner owner;
+
+	LocalBufferPool(NetworkBufferPool networkBufferPool, int numberOfRequiredMemorySegments) {
+		this.networkBufferPool = networkBufferPool;
+		this.numberOfRequiredMemorySegments = numberOfRequiredMemorySegments;
+		this.currentPoolSize = numberOfRequiredMemorySegments;
+	}
+
+	// ------------------------------------------------------------------------
+	// Properties
+	// ------------------------------------------------------------------------
+
+	@Override
+	public boolean isDestroyed() {
+		synchronized (availableMemorySegments) {
+			return isDestroyed;
+		}
+	}
+
+	@Override
+	public int getNumberOfRequiredMemorySegments() {
+		return numberOfRequiredMemorySegments;
+	}
+
+	@Override
+	public int getNumberOfAvailableMemorySegments() {
+		synchronized (availableMemorySegments) {
+			return availableMemorySegments.size();
+		}
+	}
+
+	@Override
+	public int getNumBuffers() {
+		synchronized (availableMemorySegments) {
+			return currentPoolSize;
+		}
+	}
+
+	@Override
+	public void setBufferPoolOwner(BufferPoolOwner owner) {
+		synchronized (availableMemorySegments) {
+			checkState(this.owner == null, "Buffer pool owner has already been set.");
+			this.owner = checkNotNull(owner);
+		}
+	}
+
+	@Override
+	public Buffer requestBuffer() throws IOException {
+		try {
+			return requestBuffer(false);
+		}
+		catch (InterruptedException e) {
+			throw new IOException(e);
+		}
+	}
+
+	@Override
+	public Buffer requestBufferBlocking() throws IOException, InterruptedException {
+		return requestBuffer(true);
+	}
+
+	private Buffer requestBuffer(boolean isBlocking) throws InterruptedException {
+		synchronized (availableMemorySegments) {
+			if (isDestroyed) {
+				return null;
+			}
+
+			returnExcessMemorySegments();
+
+			while (availableMemorySegments.isEmpty()) {
+				if (numberOfRequestedMemorySegments < currentPoolSize) {
+					final MemorySegment segment = networkBufferPool.requestMemorySegment();
+
+					if (segment != null) {
+						numberOfRequestedMemorySegments++;
+						availableMemorySegments.add(segment);
+
+						continue;
+					}
+				}
+
+				if (isBlocking) {
+					availableMemorySegments.wait(2000);
+				}
+				else {
+					return null;
+				}
+			}
+
+			return new Buffer(availableMemorySegments.poll(), this);
+		}
+	}
+
+	@Override
+	public void recycle(MemorySegment segment) {
+		synchronized (availableMemorySegments) {
+			if (isDestroyed || numberOfRequestedMemorySegments > currentPoolSize) {
+				returnMemorySegment(segment);
+			}
+			else {
+				EventListener<Buffer> listener = registeredListeners.poll();
+
+				if (listener == null) {
+					availableMemorySegments.add(segment);
+					availableMemorySegments.notify();
+				}
+				else {
+					try {
+						listener.onEvent(new Buffer(segment, this));
+					}
+					catch (Throwable ignored) {
+						availableMemorySegments.add(segment);
+						availableMemorySegments.notify();
+					}
+				}
+			}
+		}
+	}
+
+	/**
+	 * Destroy is called after the produce or consume phase of a task finishes.
+	 */
+	@Override
+	public void destroy() throws IOException {
+		synchronized (availableMemorySegments) {
+			if (!isDestroyed) {
+				MemorySegment segment;
+				while ((segment = availableMemorySegments.poll()) != null) {
+					returnMemorySegment(segment);
+				}
+
+				EventListener<Buffer> listener;
+				while ((listener = registeredListeners.poll()) != null) {
+					listener.onEvent(null);
+				}
+
+				isDestroyed = true;
+			}
+		}
+
+		networkBufferPool.destroyBufferPool(this);
+	}
+
+	@Override
+	public boolean addListener(EventListener<Buffer> listener) {
+		synchronized (availableMemorySegments) {
+			if (!availableMemorySegments.isEmpty() || isDestroyed) {
+				return false;
+			}
+
+			registeredListeners.add(listener);
+			return true;
+		}
+	}
+
+	@Override
+	public void setNumBuffers(int numBuffers) throws IOException {
+		synchronized (availableMemorySegments) {
+			checkArgument(numBuffers >= numberOfRequiredMemorySegments, "Buffer pool needs at least " + numberOfRequiredMemorySegments + " buffers, but tried to set to " + numBuffers + ".");
+
+			currentPoolSize = numBuffers;
+
+			returnExcessMemorySegments();
+
+			// If there is a registered owner and we have still requested more buffers than our
+			// size, trigger a recycle via the owner.
+			if (owner != null && numberOfRequestedMemorySegments > currentPoolSize) {
+				owner.recycleBuffers(numberOfRequestedMemorySegments - numBuffers);
+			}
+		}
+	}
+
+	@Override
+	public String toString() {
+		synchronized (availableMemorySegments) {
+			return String.format("[size: %d, required: %d, requested: %d, available: %d, listeners: %d, destroyed: %s]", currentPoolSize, numberOfRequiredMemorySegments, numberOfRequestedMemorySegments, availableMemorySegments.size(), registeredListeners.size(), isDestroyed);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private void returnMemorySegment(MemorySegment segment) {
+		numberOfRequestedMemorySegments--;
+		networkBufferPool.recycle(segment);
+	}
+
+	private void returnExcessMemorySegments() {
+		while (numberOfRequestedMemorySegments > currentPoolSize) {
+			MemorySegment segment = availableMemorySegments.poll();
+			if (segment == null) {
+				return;
+			}
+
+			networkBufferPool.recycle(segment);
+			numberOfRequestedMemorySegments--;
+		}
+	}
+
+}


Mime
View raw message