flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [10/13] flink git commit: [FLINK-1350] [runtime] Add blocking result partition variant
Date Wed, 18 Mar 2015 16:48:59 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
index 8889f70..cca04b7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
@@ -19,10 +19,13 @@
 package org.apache.flink.runtime.io.disk.iomanager;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.util.event.NotificationListener;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -37,36 +40,42 @@ import static com.google.common.base.Preconditions.checkNotNull;
  * @param <R> The type of request (e.g. <tt>ReadRequest</tt> or <tt>WriteRequest</tt> issued by this access to the I/O threads.
  */
 public abstract class AsynchronousFileIOChannel<T, R extends IORequest> extends AbstractFileIOChannel {
-	
-	/** The lock that is used during closing to synchronize the thread that waits for all
-	 * requests to be handled with the asynchronous I/O thread. */
+
+	private final Object listenerLock = new Object();
+
+	/**
+	 * The lock that is used during closing to synchronize the thread that waits for all
+	 * requests to be handled with the asynchronous I/O thread.
+	 */
 	protected final Object closeLock = new Object();
-	
+
 	/** A request queue for submitting asynchronous requests to the corresponding IO worker thread. */
 	protected final RequestQueue<R> requestQueue;
-	
+
 	/** An atomic integer that counts the number of requests that we still wait for to return. */
 	protected final AtomicInteger requestsNotReturned = new AtomicInteger(0);
 	
 	/** Handler for completed requests */
 	protected final RequestDoneCallback<T> resultHandler;
-	
-	/** An exception that was encountered by the asynchronous request handling thread.*/
+
+	/** An exception that was encountered by the asynchronous request handling thread. */
 	protected volatile IOException exception;
-	
+
 	/** Flag marking this channel as closed */
 	protected volatile boolean closed;
 
+	private NotificationListener allRequestsProcessedListener;
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Creates a new channel access to the path indicated by the given ID. The channel accepts buffers to be
-	 * read/written and hands them to the asynchronous I/O thread. After being processed, the buffers 
+	 * read/written and hands them to the asynchronous I/O thread. After being processed, the buffers
 	 * are returned by adding the to the given queue.
-	 * 
-	 * @param channelID The id describing the path of the file that the channel accessed.
+	 *
+	 * @param channelID    The id describing the path of the file that the channel accessed.
 	 * @param requestQueue The queue that this channel hands its IO requests to.
-	 * @param callback The callback to be invoked when a request is done.
+	 * @param callback     The callback to be invoked when a request is done.
 	 * @param writeEnabled Flag describing whether the channel should be opened in read/write mode, rather
 	 *                     than in read-only mode.
 	 * @throws IOException Thrown, if the channel could no be opened.
@@ -79,21 +88,25 @@ public abstract class AsynchronousFileIOChannel<T, R extends IORequest> extends
 		this.requestQueue = checkNotNull(requestQueue);
 		this.resultHandler = checkNotNull(callback);
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public boolean isClosed() {
 		return this.closed;
 	}
-	
+
 	/**
-	 * Closes the reader and waits until all pending asynchronous requests are
-	 * handled. Even if an exception interrupts the closing, the underlying <tt>FileChannel</tt> is closed.
-	 * 
+	 * Closes the channel and waits until all pending asynchronous requests are processed. The
+	 * underlying <code>FileChannel</code> is closed even if an exception interrupts the closing.
+	 *
+	 * <p> <strong>Important:</strong> the {@link #isClosed()} method returns <code>true</code>
+	 * immediately after this method has been called even when there are outstanding requests.
+	 *
 	 * @throws IOException Thrown, if an I/O exception occurred while waiting for the buffers, or if
 	 *                     the closing was interrupted.
 	 */
+	@Override
 	public void close() throws IOException {
 		// atomically set the close flag
 		synchronized (this.closeLock) {
@@ -101,7 +114,7 @@ public abstract class AsynchronousFileIOChannel<T, R extends IORequest> extends
 				return;
 			}
 			this.closed = true;
-			
+
 			try {
 				// wait until as many buffers have been returned as were written
 				// only then is everything guaranteed to be consistent.
@@ -136,9 +149,10 @@ public abstract class AsynchronousFileIOChannel<T, R extends IORequest> extends
 	 * <p>
 	 * Even if an exception interrupts the closing, such that not all request are handled,
 	 * the underlying <tt>FileChannel</tt> is closed and deleted.
-	 * 
+	 *
 	 * @throws IOException Thrown, if an I/O exception occurred while waiting for the buffers, or if the closing was interrupted.
 	 */
+	@Override
 	public void closeAndDelete() throws IOException {
 		try {
 			close();
@@ -147,11 +161,11 @@ public abstract class AsynchronousFileIOChannel<T, R extends IORequest> extends
 			deleteChannel();
 		}
 	}
-	
+
 	/**
 	 * Checks the exception state of this channel. The channel is erroneous, if one of its requests could not
 	 * be processed correctly.
-	 * 
+	 *
 	 * @throws IOException Thrown, if the channel is erroneous. The thrown exception contains the original exception
 	 *                     that defined the erroneous state as its cause.
 	 */
@@ -160,15 +174,15 @@ public abstract class AsynchronousFileIOChannel<T, R extends IORequest> extends
 			throw this.exception;
 		}
 	}
-	
+
 	/**
 	 * Handles a processed <tt>Buffer</tt>. This method is invoked by the
 	 * asynchronous IO worker threads upon completion of the IO request with the
 	 * provided buffer and/or an exception that occurred while processing the request
 	 * for that buffer.
-	 * 
+	 *
 	 * @param buffer The buffer to be processed.
-	 * @param ex The exception that occurred in the I/O threads when processing the buffer's request.
+	 * @param ex     The exception that occurred in the I/O threads when processing the buffer's request.
 	 */
 	final protected void handleProcessedBuffer(T buffer, IOException ex) {
 		if (buffer == null) {
@@ -186,13 +200,26 @@ public abstract class AsynchronousFileIOChannel<T, R extends IORequest> extends
 			}
 		}
 		finally {
-			// decrement the number of missing buffers. If we are currently closing, notify the waiters
+			NotificationListener listener = null;
+
+			// Decrement the number of outstanding requests. If we are currently closing, notify the
+			// waiters. If there is a listener, notify her as well.
 			synchronized (this.closeLock) {
-				final int num = this.requestsNotReturned.decrementAndGet();
-				if (this.closed && num == 0) {
-					this.closeLock.notifyAll();
+				if (this.requestsNotReturned.decrementAndGet() == 0) {
+					if (this.closed) {
+						this.closeLock.notifyAll();
+					}
+
+					synchronized (listenerLock) {
+						listener = allRequestsProcessedListener;
+						allRequestsProcessedListener = null;
+					}
 				}
 			}
+
+			if (listener != null) {
+				listener.onNotification();
+			}
 		}
 	}
 
@@ -202,14 +229,57 @@ public abstract class AsynchronousFileIOChannel<T, R extends IORequest> extends
 
 		// write the current buffer and get the next one
 		this.requestsNotReturned.incrementAndGet();
+
 		if (this.closed || this.requestQueue.isClosed()) {
 			// if we found ourselves closed after the counter increment,
 			// decrement the counter again and do not forward the request
 			this.requestsNotReturned.decrementAndGet();
+
+			final NotificationListener listener;
+
+			synchronized (listenerLock) {
+				listener = allRequestsProcessedListener;
+				allRequestsProcessedListener = null;
+			}
+
+			if (listener != null) {
+				listener.onNotification();
+			}
+
 			throw new IOException("I/O channel already closed. Could not fulfill: " + request);
 		}
+
 		this.requestQueue.add(request);
 	}
+
+	/**
+	 * Registers a listener to be notified when all outstanding requests have been processed.
+	 *
+	 * <p> New requests can arrive right after the listener got notified. Therefore, it is not safe
+	 * to assume that the number of outstanding requests is still zero after a notification unless
+	 * there was a close right before the listener got called.
+	 *
+	 * <p> Returns <code>true</code>, if the registration was successful. A registration can fail,
+	 * if there are no outstanding requests when trying to register a listener.
+	 */
+	protected boolean registerAllRequestsProcessedListener(NotificationListener listener) throws IOException {
+		checkNotNull(listener);
+
+		synchronized (listenerLock) {
+			if (allRequestsProcessedListener == null) {
+				// There was a race with the processing of the last outstanding request
+				if (requestsNotReturned.get() == 0) {
+					return false;
+				}
+
+				allRequestsProcessedListener = listener;
+
+				return true;
+			}
+		}
+
+		throw new IllegalStateException("Already subscribed.");
+	}
 }
 
 //--------------------------------------------------------------------------------------------
@@ -218,11 +288,11 @@ public abstract class AsynchronousFileIOChannel<T, R extends IORequest> extends
  * Read request that reads an entire memory segment from a block reader.
  */
 final class SegmentReadRequest implements ReadRequest {
-	
+
 	private final AsynchronousFileIOChannel<MemorySegment, ReadRequest> channel;
-	
+
 	private final MemorySegment segment;
-	
+
 	protected SegmentReadRequest(AsynchronousFileIOChannel<MemorySegment, ReadRequest> targetChannel, MemorySegment segment) {
 		this.channel = targetChannel;
 		this.segment = segment;
@@ -254,11 +324,11 @@ final class SegmentReadRequest implements ReadRequest {
  * Write request that writes an entire memory segment to the block writer.
  */
 final class SegmentWriteRequest implements WriteRequest {
-	
+
 	private final AsynchronousFileIOChannel<MemorySegment, WriteRequest> channel;
-	
+
 	private final MemorySegment segment;
-	
+
 	protected SegmentWriteRequest(AsynchronousFileIOChannel<MemorySegment, WriteRequest> targetChannel, MemorySegment segment) {
 		this.channel = targetChannel;
 		this.segment = segment;
@@ -280,6 +350,135 @@ final class SegmentWriteRequest implements WriteRequest {
 	}
 }
 
+final class BufferWriteRequest implements WriteRequest {
+
+	private final AsynchronousFileIOChannel<Buffer, WriteRequest> channel;
+
+	private final Buffer buffer;
+
+	protected BufferWriteRequest(AsynchronousFileIOChannel<Buffer, WriteRequest> targetChannel, Buffer buffer) {
+		this.channel = checkNotNull(targetChannel);
+		this.buffer = checkNotNull(buffer);
+	}
+
+	@Override
+	public void write() throws IOException {
+		final ByteBuffer header = ByteBuffer.allocateDirect(8);
+
+		header.putInt(buffer.isBuffer() ? 1 : 0);
+		header.putInt(buffer.getSize());
+		header.flip();
+
+		channel.fileChannel.write(header);
+		channel.fileChannel.write(buffer.getNioBuffer());
+	}
+
+	@Override
+	public void requestDone(IOException error) {
+		channel.handleProcessedBuffer(buffer, error);
+	}
+}
+
+final class BufferReadRequest implements ReadRequest {
+
+	private final AsynchronousFileIOChannel<Buffer, ReadRequest> channel;
+
+	private final Buffer buffer;
+
+	private final AtomicBoolean hasReachedEndOfFile;
+
+	protected BufferReadRequest(AsynchronousFileIOChannel<Buffer, ReadRequest> targetChannel, Buffer buffer, AtomicBoolean hasReachedEndOfFile) {
+		this.channel = targetChannel;
+		this.buffer = buffer;
+		this.hasReachedEndOfFile = hasReachedEndOfFile;
+	}
+
+	@Override
+	public void read() throws IOException {
+
+		final FileChannel fileChannel = channel.fileChannel;
+
+		if (fileChannel.size() - fileChannel.position() > 0) {
+			final ByteBuffer header = ByteBuffer.allocateDirect(8);
+
+			fileChannel.read(header);
+			header.flip();
+
+			final boolean isBuffer = header.getInt() == 1;
+			final int size = header.getInt();
+
+			if (size > buffer.getMemorySegment().size()) {
+				throw new IllegalStateException("Buffer is too small for data: " + buffer.getMemorySegment().size() + " bytes available, but " + size + " needed. This is most likely due to an serialized event, which is larger than the buffer size.");
+			}
+
+			buffer.setSize(size);
+
+			fileChannel.read(buffer.getNioBuffer());
+
+			if (!isBuffer) {
+				buffer.tagAsEvent();
+			}
+
+			hasReachedEndOfFile.set(fileChannel.size() - fileChannel.position() == 0);
+		}
+		else {
+			hasReachedEndOfFile.set(true);
+		}
+	}
+
+	@Override
+	public void requestDone(IOException error) {
+		channel.handleProcessedBuffer(buffer, error);
+	}
+}
+
+final class FileSegmentReadRequest implements ReadRequest {
+
+	private final AsynchronousFileIOChannel<FileSegment, ReadRequest> channel;
+
+	private final AtomicBoolean hasReachedEndOfFile;
+
+	private FileSegment fileSegment;
+
+	protected FileSegmentReadRequest(AsynchronousFileIOChannel<FileSegment, ReadRequest> targetChannel, AtomicBoolean hasReachedEndOfFile) {
+		this.channel = targetChannel;
+		this.hasReachedEndOfFile = hasReachedEndOfFile;
+	}
+
+	@Override
+	public void read() throws IOException {
+
+		final FileChannel fileChannel = channel.fileChannel;
+
+		if (fileChannel.size() - fileChannel.position() > 0) {
+			final ByteBuffer header = ByteBuffer.allocateDirect(8);
+
+			fileChannel.read(header);
+			header.flip();
+
+			final long position = fileChannel.position();
+
+			final boolean isBuffer = header.getInt() == 1;
+			final int length = header.getInt();
+
+			fileSegment = new FileSegment(fileChannel, position, length, isBuffer);
+
+			// Skip the binary dataa
+			fileChannel.position(position + length);
+
+			hasReachedEndOfFile.set(fileChannel.size() - fileChannel.position() == 0);
+		}
+		else {
+			hasReachedEndOfFile.set(true);
+		}
+	}
+
+	@Override
+	public void requestDone(IOException error) {
+		channel.handleProcessedBuffer(fileSegment, error);
+	}
+}
+
 /**
  * Request that seeks the underlying file channel to the given position.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
index 8f7f218..957052e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
@@ -21,15 +21,13 @@ package org.apache.flink.runtime.io.disk.iomanager;
 import java.io.IOException;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.flink.core.memory.MemorySegment;
-
 /**
  * A reader that reads data in blocks from a file channel. The reader reads the blocks into a 
  * {@link org.apache.flink.core.memory.MemorySegment}. To support asynchronous implementations,
  * the read method does not immediately return the full memory segment, but rather adds it to
  * a blocking queue of finished read operations.
  */
-public interface BlockChannelReader extends FileIOChannel {
+public interface BlockChannelReader<T> extends FileIOChannel {
 
 	/**
 	 * Issues a read request, which will fill the given segment with the next block in the
@@ -39,33 +37,27 @@ public interface BlockChannelReader extends FileIOChannel {
 	 * @param segment The segment to read the block into.
 	 * @throws IOException Thrown, when the reader encounters an I/O error.
 	 */
-	void readBlock(MemorySegment segment) throws IOException;
+	void readBlock(T segment) throws IOException;
+
+	void seekToPosition(long position) throws IOException;
 	
 	/**
 	 * Gets the next memory segment that has been filled with data by the reader. This method blocks until
 	 * such a segment is available, or until an error occurs in the reader, or the reader is closed.
 	 * <p>
 	 * WARNING: If this method is invoked without any segment ever returning (for example, because the
-	 * {@link #readBlock(MemorySegment)} method has not been invoked appropriately), the method may block
+	 * {@link #readBlock(T)} method has not been invoked appropriately), the method may block
 	 * forever.
 	 * 
 	 * @return The next memory segment from the reader's return queue.
 	 * @throws IOException Thrown, if an I/O error occurs in the reader while waiting for the request to return.
 	 */
-	public MemorySegment getNextReturnedSegment() throws IOException;
+	public T getNextReturnedBlock() throws IOException;
 	
 	/**
 	 * Gets the queue in which the full memory segments are queued after the read is complete.
 	 * 
 	 * @return The queue with the full memory segments.
 	 */
-	LinkedBlockingQueue<MemorySegment> getReturnQueue();
-	
-	/**
-	 * Seeks the underlying file channel to the given position.
-	 * 
-	 * @param position The position to seek to.
-	 */
-	void seekToPosition(long position) throws IOException;
+	LinkedBlockingQueue<T> getReturnQueue();
 }
-	
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriter.java
index 25c74e4..ccf065a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriter.java
@@ -21,15 +21,13 @@ package org.apache.flink.runtime.io.disk.iomanager;
 import java.io.IOException;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.flink.core.memory.MemorySegment;
-
 /**
  * A writer that writes data in blocks to a file channel. The writer receives the data blocks in the form of 
  * {@link org.apache.flink.core.memory.MemorySegment}, which it writes entirely to the channel,
  * regardless of how space in the segment is used. The writing may be realized synchronously, or asynchronously,
  * depending on the implementation.
  */
-public interface BlockChannelWriter extends BlockChannelWriterWithCallback {
+public interface BlockChannelWriter<T> extends BlockChannelWriterWithCallback<T> {
 	
 	/**
 	 * Gets the next memory segment that has been written and is available again.
@@ -37,13 +35,13 @@ public interface BlockChannelWriter extends BlockChannelWriterWithCallback {
 	 * writer is closed.
 	 * <p>
 	 * NOTE: If this method is invoked without any segment ever returning (for example, because the
-	 * {@link #writeBlock(MemorySegment)} method has not been invoked accordingly), the method may block
+	 * {@link #writeBlock(T)} method has not been invoked accordingly), the method may block
 	 * forever.
 	 * 
 	 * @return The next memory segment from the writers's return queue.
 	 * @throws IOException Thrown, if an I/O error occurs in the writer while waiting for the request to return.
 	 */
-	MemorySegment getNextReturnedSegment() throws IOException;
+	T getNextReturnedBlock() throws IOException;
 	
 	/**
 	 * Gets the queue in which the memory segments are queued after the asynchronous write
@@ -51,5 +49,5 @@ public interface BlockChannelWriter extends BlockChannelWriterWithCallback {
 	 * 
 	 * @return The queue with the written memory segments.
 	 */
-	LinkedBlockingQueue<MemorySegment> getReturnQueue();
+	LinkedBlockingQueue<T> getReturnQueue();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
index 57bc7e0..f7618e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
@@ -20,16 +20,14 @@ package org.apache.flink.runtime.io.disk.iomanager;
 
 import java.io.IOException;
 
-import org.apache.flink.core.memory.MemorySegment;
+public interface BlockChannelWriterWithCallback<T> extends FileIOChannel {
 
-public interface BlockChannelWriterWithCallback extends FileIOChannel {
-	
 	/**
-	 * Writes the given memory segment. The request may be executed synchronously, or asynchronously, depending
+	 * Writes the given block. The request may be executed synchronously, or asynchronously, depending
 	 * on the implementation.
-	 * 
-	 * @param segment The segment to be written.
+	 *
+	 * @param block The segment to be written.
 	 * @throws IOException Thrown, when the writer encounters an I/O error.
 	 */
-	void writeBlock(MemorySegment segment) throws IOException;
+	void writeBlock(T block) throws IOException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileReader.java
new file mode 100644
index 0000000..74999e2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileReader.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import java.io.IOException;
+
+public interface BufferFileReader extends FileIOChannel {
+
+	void readInto(Buffer buffer) throws IOException;
+
+	void seekToPosition(long position) throws IOException;
+
+	boolean hasReachedEndOfFile();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileSegmentReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileSegmentReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileSegmentReader.java
new file mode 100644
index 0000000..fa25d4f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileSegmentReader.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+
+public interface BufferFileSegmentReader extends FileIOChannel {
+
+	void read() throws IOException;
+
+	void seekTo(long position) throws IOException;
+
+	boolean hasReachedEndOfFile();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriter.java
new file mode 100644
index 0000000..704aad2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.util.event.NotificationListener;
+
+import java.io.IOException;
+
+public interface BufferFileWriter extends BlockChannelWriterWithCallback<Buffer> {
+
+	/**
+	 * Returns the number of outstanding requests.
+	 */
+	int getNumberOfOutstandingRequests();
+
+	/**
+	 * Registers a listener, which is notified after all outstanding requests have been processed.
+	 */
+	boolean registerAllRequestsProcessedListener(NotificationListener listener) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
index d85ec82..b919034 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
@@ -35,7 +35,7 @@ import org.apache.flink.runtime.memorymanager.AbstractPagedInputView;
  */
 public class ChannelReaderInputView extends AbstractPagedInputView {
 	
-	protected final BlockChannelReader reader;		// the block reader that reads memory segments
+	protected final BlockChannelReader<MemorySegment> reader;		// the block reader that reads memory segments
 	
 	protected int numRequestsRemaining;				// the number of block requests remaining
 	
@@ -63,7 +63,7 @@ public class ChannelReaderInputView extends AbstractPagedInputView {
 	 * @throws IOException Thrown, if the read requests for the first blocks fail to be
 	 *                     served by the reader.
 	 */
-	public ChannelReaderInputView(BlockChannelReader reader, List<MemorySegment> memory, boolean waitForFirstBlock)
+	public ChannelReaderInputView(BlockChannelReader<MemorySegment> reader, List<MemorySegment> memory, boolean waitForFirstBlock)
 	throws IOException
 	{
 		this(reader, memory, -1, waitForFirstBlock);
@@ -89,7 +89,7 @@ public class ChannelReaderInputView extends AbstractPagedInputView {
 	 * @throws IOException Thrown, if the read requests for the first blocks fail to be
 	 *                     served by the reader.
 	 */
-	public ChannelReaderInputView(BlockChannelReader reader, List<MemorySegment> memory, 
+	public ChannelReaderInputView(BlockChannelReader<MemorySegment> reader, List<MemorySegment> memory,
 														int numBlocks, boolean waitForFirstBlock)
 	throws IOException
 	{
@@ -117,7 +117,7 @@ public class ChannelReaderInputView extends AbstractPagedInputView {
 	 * 
 	 * @throws IOException
 	 */
-	ChannelReaderInputView(BlockChannelReader reader, List<MemorySegment> memory, 
+	ChannelReaderInputView(BlockChannelReader<MemorySegment> reader, List<MemorySegment> memory,
 				int numBlocks, int headerLen, boolean waitForFirstBlock)
 	throws IOException
 	{
@@ -225,7 +225,7 @@ public class ChannelReaderInputView extends AbstractPagedInputView {
 		}
 		
 		// get the next segment
-		final MemorySegment seg = this.reader.getNextReturnedSegment();
+		final MemorySegment seg = this.reader.getNextReturnedBlock();
 		
 		// check the header
 		if (seg.getShort(0) != ChannelWriterOutputView.HEADER_MAGIC_NUMBER) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
index 9824d34..089e10a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
@@ -61,7 +61,7 @@ public final class ChannelWriterOutputView extends AbstractPagedOutputView {
 	
 	// --------------------------------------------------------------------------------------------
 	
-	private final BlockChannelWriter writer;		// the writer to the channel
+	private final BlockChannelWriter<MemorySegment> writer;		// the writer to the channel
 	
 	private long bytesBeforeSegment;				// the number of bytes written before the current memory segment
 	
@@ -81,7 +81,7 @@ public final class ChannelWriterOutputView extends AbstractPagedOutputView {
 	 * @param memory The memory used to buffer data, or null, to utilize solely the return queue.
 	 * @param segmentSize The size of the memory segments.
 	 */
-	public ChannelWriterOutputView(BlockChannelWriter writer, List<MemorySegment> memory, int segmentSize) {
+	public ChannelWriterOutputView(BlockChannelWriter<MemorySegment> writer, List<MemorySegment> memory, int segmentSize) {
 		super(segmentSize, HEADER_LENGTH);
 		
 		if (writer == null) {
@@ -123,7 +123,7 @@ public final class ChannelWriterOutputView extends AbstractPagedOutputView {
 	 * @param writer The writer to write to.
 	 * @param segmentSize The size of the memory segments.
 	 */
-	public ChannelWriterOutputView(BlockChannelWriter writer, int segmentSize)
+	public ChannelWriterOutputView(BlockChannelWriter<MemorySegment> writer, int segmentSize)
 	{
 		this(writer, null, segmentSize);
 	}
@@ -203,7 +203,7 @@ public final class ChannelWriterOutputView extends AbstractPagedOutputView {
 			writeSegment(current, posInSegment, false);
 		}
 		
-		final MemorySegment next = this.writer.getNextReturnedSegment();
+		final MemorySegment next = this.writer.getNextReturnedBlock();
 		this.blockCount++;
 		return next;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
index c5a3daa..f9ee90c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.disk.iomanager;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.channels.FileChannel;
 import java.util.Random;
 
 import org.apache.flink.util.StringUtils;
@@ -73,6 +74,8 @@ public interface FileIOChannel {
 	* @throws IOException Thrown, if an error occurred while waiting for pending requests.
 	*/
 	public void closeAndDelete() throws IOException;
+
+	FileChannel getNioFileChannel();
 	
 	// --------------------------------------------------------------------------------------------
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileSegment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileSegment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileSegment.java
new file mode 100644
index 0000000..7c3a83e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileSegment.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.nio.channels.FileChannel;
+
+public class FileSegment {
+
+	private final FileChannel fileChannel;
+	private final long position;
+	private final int length;
+	private final boolean isBuffer;
+
+	public FileSegment(FileChannel fileChannel, long position, int length, boolean isBuffer) {
+		this.fileChannel = fileChannel;
+		this.position = position;
+		this.length = length;
+		this.isBuffer = isBuffer;
+	}
+
+	public FileChannel getFileChannel() {
+		return fileChannel;
+	}
+
+	public long getPosition() {
+		return position;
+	}
+
+	public int getLength() {
+		return length;
+	}
+
+	public boolean isBuffer() {
+		return isBuffer;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/HeaderlessChannelReaderInputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/HeaderlessChannelReaderInputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/HeaderlessChannelReaderInputView.java
index cdad3fb..63e86c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/HeaderlessChannelReaderInputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/HeaderlessChannelReaderInputView.java
@@ -60,7 +60,7 @@ public class HeaderlessChannelReaderInputView extends ChannelReaderInputView
 	 * @throws IOException Thrown, if the read requests for the first blocks fail to be
 	 *                     served by the reader.
 	 */
-	public HeaderlessChannelReaderInputView(BlockChannelReader reader, List<MemorySegment> memory, int numBlocks,
+	public HeaderlessChannelReaderInputView(BlockChannelReader<MemorySegment> reader, List<MemorySegment> memory, int numBlocks,
 			int numBytesInLastBlock, boolean waitForFirstBlock)
 	throws IOException
 	{
@@ -87,7 +87,7 @@ public class HeaderlessChannelReaderInputView extends ChannelReaderInputView
 		
 		// get the next segment
 		this.numBlocksRemaining--;
-		return this.reader.getNextReturnedSegment();
+		return this.reader.getNextReturnedBlock();
 	}
 	
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index c04ba97..c1a4b84 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.disk.iomanager;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,6 +36,21 @@ import java.util.concurrent.LinkedBlockingQueue;
  */
 public abstract class IOManager {
 
+	public enum IOMode {
+
+		SYNC(true), ASYNC(false);
+
+		private final boolean isSynchronous;
+
+		IOMode(boolean isSynchronous) {
+			this.isSynchronous = isSynchronous;
+		}
+
+		public boolean isSynchronous() {
+			return isSynchronous;
+		}
+	}
+
 	/** Logging */
 	protected static final Logger LOG = LoggerFactory.getLogger(IOManager.class);
 
@@ -190,7 +206,7 @@ public abstract class IOManager {
 	 * @return A block channel writer that writes to the given channel.
 	 * @throws IOException Thrown, if the channel for the writer could not be opened.
 	 */
-	public BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID channelID) throws IOException {
+	public BlockChannelWriter<MemorySegment> createBlockChannelWriter(FileIOChannel.ID channelID) throws IOException {
 		return createBlockChannelWriter(channelID, new LinkedBlockingQueue<MemorySegment>());
 	}
 
@@ -203,7 +219,7 @@ public abstract class IOManager {
 	 * @return A block channel writer that writes to the given channel.
 	 * @throws IOException Thrown, if the channel for the writer could not be opened.
 	 */
-	public abstract BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID channelID,
+	public abstract BlockChannelWriter<MemorySegment> createBlockChannelWriter(FileIOChannel.ID channelID,
 				LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException;
 
 	/**
@@ -216,7 +232,7 @@ public abstract class IOManager {
 	 * @return A block channel writer that writes to the given channel.
 	 * @throws IOException Thrown, if the channel for the writer could not be opened.
 	 */
-	public abstract BlockChannelWriterWithCallback createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback<MemorySegment> callback) throws IOException;
+	public abstract BlockChannelWriterWithCallback<MemorySegment> createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback<MemorySegment> callback) throws IOException;
 
 	/**
 	 * Creates a block channel reader that reads blocks from the given channel. The reader pushed
@@ -227,7 +243,7 @@ public abstract class IOManager {
 	 * @return A block channel reader that reads from the given channel.
 	 * @throws IOException Thrown, if the channel for the reader could not be opened.
 	 */
-	public BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID) throws IOException {
+	public BlockChannelReader<MemorySegment> createBlockChannelReader(FileIOChannel.ID channelID) throws IOException {
 		return createBlockChannelReader(channelID, new LinkedBlockingQueue<MemorySegment>());
 	}
 
@@ -240,9 +256,15 @@ public abstract class IOManager {
 	 * @return A block channel reader that reads from the given channel.
 	 * @throws IOException Thrown, if the channel for the reader could not be opened.
 	 */
-	public abstract BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID,
+	public abstract BlockChannelReader<MemorySegment> createBlockChannelReader(FileIOChannel.ID channelID,
 										LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException;
 
+	public abstract BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) throws IOException;
+
+	public abstract BufferFileReader createBufferFileReader(FileIOChannel.ID channelID, RequestDoneCallback<Buffer> callback) throws IOException;
+
+	public abstract BufferFileSegmentReader createBufferFileSegmentReader(FileIOChannel.ID channelID, RequestDoneCallback<FileSegment> callback) throws IOException;
+
 	/**
 	 * Creates a block channel reader that reads all blocks from the given channel directly in one bulk.
 	 * The reader draws segments to read the blocks into from a supplied list, which must contain as many

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
index 2396665..e615913 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.disk.iomanager;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
 import java.io.IOException;
@@ -143,7 +144,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 			}
 		}
 		finally {
-			// make sure we all the super implementation in any case and at the last point,
+			// make sure we call the super implementation in any case and at the last point,
 			// because this will clean up the I/O directories
 			super.shutdown();
 		}
@@ -182,7 +183,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 	// ------------------------------------------------------------------------
 	
 	@Override
-	public BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID channelID,
+	public BlockChannelWriter<MemorySegment> createBlockChannelWriter(FileIOChannel.ID channelID,
 								LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException
 	{
 		checkState(!isShutdown.get(), "I/O-Manger is shut down.");
@@ -190,7 +191,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 	}
 	
 	@Override
-	public BlockChannelWriterWithCallback createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback<MemorySegment> callback) throws IOException {
+	public BlockChannelWriterWithCallback<MemorySegment> createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback<MemorySegment> callback) throws IOException {
 		checkState(!isShutdown.get(), "I/O-Manger is shut down.");
 		return new AsynchronousBlockWriterWithCallback(channelID, this.writers[channelID.getThreadNum()].requestQueue, callback);
 	}
@@ -206,13 +207,34 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 	 * @throws IOException Thrown, if the channel for the reader could not be opened.
 	 */
 	@Override
-	public BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID,
+	public BlockChannelReader<MemorySegment> createBlockChannelReader(FileIOChannel.ID channelID,
 										LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException
 	{
 		checkState(!isShutdown.get(), "I/O-Manger is shut down.");
 		return new AsynchronousBlockReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, returnQueue);
 	}
-	
+
+	@Override
+	public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) throws IOException {
+		checkState(!isShutdown.get(), "I/O-Manger is shut down.");
+
+		return new AsynchronousBufferFileWriter(channelID, writers[channelID.getThreadNum()].requestQueue);
+	}
+
+	@Override
+	public BufferFileReader createBufferFileReader(FileIOChannel.ID channelID, RequestDoneCallback<Buffer> callback) throws IOException {
+		checkState(!isShutdown.get(), "I/O-Manger is shut down.");
+
+		return new AsynchronousBufferFileReader(channelID, readers[channelID.getThreadNum()].requestQueue, callback);
+	}
+
+	@Override
+	public BufferFileSegmentReader createBufferFileSegmentReader(FileIOChannel.ID channelID, RequestDoneCallback<FileSegment> callback) throws IOException {
+		checkState(!isShutdown.get(), "I/O-Manger is shut down.");		
+
+		return new AsynchronousBufferFileSegmentReader(channelID, readers[channelID.getThreadNum()].requestQueue, callback);
+	}
+
 	/**
 	 * Creates a block channel reader that reads all blocks from the given channel directly in one bulk.
 	 * The reader draws segments to read the blocks into from a supplied list, which must contain as many

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java
index 95f3dc7..a2e3e82 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java
@@ -21,26 +21,24 @@ package org.apache.flink.runtime.io.disk.iomanager;
 import java.io.IOException;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.flink.core.memory.MemorySegment;
-
 /**
  * A {@link RequestDoneCallback} that adds the memory segments to a blocking queue.
  */
-public class QueuingCallback implements RequestDoneCallback<MemorySegment> {
+public class QueuingCallback<T> implements RequestDoneCallback<T> {
+
+	private final LinkedBlockingQueue<T> queue;
 
-	private final LinkedBlockingQueue<MemorySegment> queue;
-	
-	public QueuingCallback(LinkedBlockingQueue<MemorySegment> queue) {
+	public QueuingCallback(LinkedBlockingQueue<T> queue) {
 		this.queue = queue;
 	}
 
 	@Override
-	public void requestSuccessful(MemorySegment buffer) {
+	public void requestSuccessful(T buffer) {
 		queue.add(buffer);
 	}
 
 	@Override
-	public void requestFailed(MemorySegment buffer, IOException e) {
+	public void requestFailed(T buffer, IOException e) {
 		// the I/O error is recorded in the writer already
 		queue.add(buffer);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousBufferFileReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousBufferFileReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousBufferFileReader.java
new file mode 100644
index 0000000..27189cb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousBufferFileReader.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * A synchronous {@link BufferFileReader} implementation.
+ *
+ * <p> This currently bypasses the I/O manager as it is the only synchronous implementation, which
+ * is currently in use.
+ *
+ * TODO Refactor I/O manager setup and refactor this into it
+ */
+public class SynchronousBufferFileReader extends SynchronousFileIOChannel implements BufferFileReader {
+
+	private final ByteBuffer header = ByteBuffer.allocateDirect(8);
+
+	private boolean hasReachedEndOfFile;
+
+	public SynchronousBufferFileReader(ID channelID, boolean writeEnabled) throws IOException {
+		super(channelID, writeEnabled);
+	}
+
+	@Override
+	public void readInto(Buffer buffer) throws IOException {
+		if (fileChannel.size() - fileChannel.position() > 0) {
+			// This is the synchronous counter part to the asynchronous buffer read request
+
+			// Read header
+			header.clear();
+			fileChannel.read(header);
+			header.flip();
+
+			final boolean isBuffer = header.getInt() == 1;
+			final int size = header.getInt();
+
+			if (size > buffer.getMemorySegment().size()) {
+				throw new IllegalStateException("Buffer is too small for data: " + buffer.getMemorySegment().size() + " bytes available, but " + size + " needed. This is most likely due to an serialized event, which is larger than the buffer size.");
+			}
+
+			buffer.setSize(size);
+
+			fileChannel.read(buffer.getNioBuffer());
+
+			if (!isBuffer) {
+				buffer.tagAsEvent();
+			}
+
+			hasReachedEndOfFile = fileChannel.size() - fileChannel.position() == 0;
+		}
+		else {
+			buffer.recycle();
+		}
+	}
+
+	@Override
+	public void seekToPosition(long position) throws IOException {
+		fileChannel.position(position);
+	}
+
+	@Override
+	public boolean hasReachedEndOfFile() {
+		return hasReachedEndOfFile;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousFileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousFileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousFileIOChannel.java
index fd6c230..19a0fc9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousFileIOChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousFileIOChannel.java
@@ -42,4 +42,4 @@ public abstract class SynchronousFileIOChannel extends AbstractFileIOChannel {
 			this.fileChannel.close();
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
new file mode 100644
index 0000000..5a31c3f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A {@link ConnectionID} identifies a connection to a remote task manager by the socket address and
+ * a connection index. This allows multiple connections to the same task manager to be distinguished
+ * by their connection index.
+ *
+ * <p> The connection index is assigned by the {@link IntermediateResult} and ensures that it is
+ * safe to multiplex multiple data transfers over the same physical TCP connection.
+ */
+public class ConnectionID implements Serializable {
+
+	private final InetSocketAddress address;
+
+	private final int connectionIndex;
+
+	public ConnectionID(InstanceConnectionInfo connectionInfo, int connectionIndex) {
+		this(new InetSocketAddress(connectionInfo.address(), connectionInfo.dataPort()), connectionIndex);
+	}
+
+	public ConnectionID(InetSocketAddress address, int connectionIndex) {
+		this.address = checkNotNull(address);
+		checkArgument(connectionIndex >= 0);
+		this.connectionIndex = connectionIndex;
+	}
+
+	public InetSocketAddress getAddress() {
+		return address;
+	}
+
+	public int getConnectionIndex() {
+		return connectionIndex;
+	}
+
+	@Override
+	public int hashCode() {
+		return address.hashCode() + (31 * connectionIndex);
+	}
+
+	@Override
+	public boolean equals(Object other) {
+		if (other.getClass() != ConnectionID.class) {
+			return false;
+		}
+
+		final ConnectionID ra = (ConnectionID) other;
+		if (!ra.getAddress().equals(address) || ra.getConnectionIndex() != connectionIndex) {
+			return false;
+		}
+
+		return true;
+	}
+
+	@Override
+	public String toString() {
+		return address + " [" + connectionIndex + "]";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
index 76f8bbd..06dc151 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
@@ -18,8 +18,9 @@
 
 package org.apache.flink.runtime.io.network;
 
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
-import org.apache.flink.runtime.io.network.partition.IntermediateResultPartitionProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 
 import java.io.IOException;
 
@@ -29,20 +30,20 @@ import java.io.IOException;
  */
 public interface ConnectionManager {
 
-	void start(IntermediateResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException;
+	void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher, NetworkBufferPool networkbufferPool) throws IOException;
 
 	/**
-	 * Creates a {@link PartitionRequestClient} instance for the given {@link RemoteAddress}.
+	 * Creates a {@link PartitionRequestClient} instance for the given {@link ConnectionID}.
 	 */
-	PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException, InterruptedException;
+	PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException, InterruptedException;
 
 	/**
 	 * Closes opened ChannelConnections in case of a resource release
-	 * @param remoteAddress
 	 */
-	void closeOpenChannelConnections(RemoteAddress remoteAddress);
+	void closeOpenChannelConnections(ConnectionID connectionId);
 
 	int getNumberOfActiveConnections();
 
 	void shutdown() throws IOException;
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
index 447f6e6..af6273e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
@@ -18,8 +18,9 @@
 
 package org.apache.flink.runtime.io.network;
 
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
-import org.apache.flink.runtime.io.network.partition.IntermediateResultPartitionProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 
 import java.io.IOException;
 
@@ -30,16 +31,16 @@ import java.io.IOException;
 public class LocalConnectionManager implements ConnectionManager {
 
 	@Override
-	public void start(IntermediateResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException {
+	public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher, NetworkBufferPool networkbufferPool) throws IOException {
 	}
 
 	@Override
-	public PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException {
+	public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException {
 		return null;
 	}
 
 	@Override
-	public void closeOpenChannelConnections(RemoteAddress remoteAddress) {}
+	public void closeOpenChannelConnections(ConnectionID connectionId) {}
 
 	@Override
 	public int getNumberOfActiveConnections() {

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 58b21e1..e02e744 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -21,13 +21,14 @@ package org.apache.flink.runtime.io.network;
 import akka.actor.ActorRef;
 import akka.util.Timeout;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
-import org.apache.flink.runtime.io.network.partition.IntermediateResultPartition;
-import org.apache.flink.runtime.io.network.partition.IntermediateResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.Task;
@@ -54,7 +55,7 @@ public class NetworkEnvironment {
 
 	private final FiniteDuration jobManagerTimeout;
 
-	private final IntermediateResultPartitionManager partitionManager;
+	private final ResultPartitionManager partitionManager;
 
 	private final TaskEventDispatcher taskEventDispatcher;
 
@@ -62,6 +63,8 @@ public class NetworkEnvironment {
 
 	private final ConnectionManager connectionManager;
 
+	private final NetworkEnvironmentConfiguration configuration;
+
 	private boolean isShutdown;
 
 	/**
@@ -74,8 +77,9 @@ public class NetworkEnvironment {
 		this.jobManager = checkNotNull(jobManager);
 		this.jobManagerTimeout = checkNotNull(jobManagerTimeout);
 
-		this.partitionManager = new IntermediateResultPartitionManager();
+		this.partitionManager = new ResultPartitionManager();
 		this.taskEventDispatcher = new TaskEventDispatcher();
+		this.configuration = checkNotNull(config);
 
 		// --------------------------------------------------------------------
 		// Network buffers
@@ -95,7 +99,7 @@ public class NetworkEnvironment {
 		connectionManager = nettyConfig.isDefined() ? new NettyConnectionManager(nettyConfig.get()) : new LocalConnectionManager();
 
 		try {
-			connectionManager.start(partitionManager, taskEventDispatcher);
+			connectionManager.start(partitionManager, taskEventDispatcher, networkBufferPool);
 		}
 		catch (Throwable t) {
 			throw new IOException("Failed to instantiate network connection manager: " + t.getMessage(), t);
@@ -115,30 +119,29 @@ public class NetworkEnvironment {
 	}
 
 	public void registerTask(Task task) throws IOException {
-		final ExecutionAttemptID executionId = task.getExecutionId();
-
-		final IntermediateResultPartition[] producedPartitions = task.getProducedPartitions();
-		final BufferWriter[] writers = task.getWriters();
+		final ResultPartition[] producedPartitions = task.getProducedPartitions();
+		final ResultPartitionWriter[] writers = task.getWriters();
 
 		if (writers.length != producedPartitions.length) {
 			throw new IllegalStateException("Unequal number of writers and partitions.");
 		}
 
 		for (int i = 0; i < producedPartitions.length; i++) {
-			final IntermediateResultPartition partition = producedPartitions[i];
-			final BufferWriter writer = writers[i];
+			final ResultPartition partition = producedPartitions[i];
+			final ResultPartitionWriter writer = writers[i];
 
 			// Buffer pool for the partition
 			BufferPool bufferPool = null;
 
 			try {
-				bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfQueues(), false);
-				partition.setBufferPool(bufferPool);
+				bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions(), false);
+				partition.registerBufferPool(bufferPool);
+
 				partitionManager.registerIntermediateResultPartition(partition);
 			}
 			catch (Throwable t) {
 				if (bufferPool != null) {
-					bufferPool.destroy();
+					bufferPool.lazyDestroy();
 				}
 
 				if (t instanceof IOException) {
@@ -150,7 +153,7 @@ public class NetworkEnvironment {
 			}
 
 			// Register writer with task event dispatcher
-			taskEventDispatcher.registerWriterForIncomingTaskEvents(executionId, writer.getPartitionId(), writer);
+			taskEventDispatcher.registerWriterForIncomingTaskEvents(writer.getPartitionId(), writer);
 		}
 
 		// Setup the buffer pool for each buffer reader
@@ -165,7 +168,7 @@ public class NetworkEnvironment {
 			}
 			catch (Throwable t) {
 				if (bufferPool != null) {
-					bufferPool.destroy();
+					bufferPool.lazyDestroy();
 				}
 
 				if (t instanceof IOException) {
@@ -185,10 +188,16 @@ public class NetworkEnvironment {
 		final ExecutionAttemptID executionId = task.getExecutionId();
 
 		if (task.isCanceledOrFailed()) {
-			partitionManager.failIntermediateResultPartitions(executionId);
+			partitionManager.releasePartitionsProducedBy(executionId);
 		}
 
-		taskEventDispatcher.unregisterWriters(executionId);
+		ResultPartitionWriter[] writers = task.getWriters();
+
+		if (writers != null) {
+			for (ResultPartitionWriter writer : task.getWriters()) {
+				taskEventDispatcher.unregisterWriter(writer);
+			}
+		}
 
 		final SingleInputGate[] inputGates = task.getInputGates();
 
@@ -206,7 +215,7 @@ public class NetworkEnvironment {
 		}
 	}
 
-	public IntermediateResultPartitionManager getPartitionManager() {
+	public ResultPartitionManager getPartitionManager() {
 		return partitionManager;
 	}
 
@@ -222,6 +231,10 @@ public class NetworkEnvironment {
 		return networkBufferPool;
 	}
 
+	public IOMode getDefaultIOMode() {
+		return configuration.ioMode();
+	}
+
 	public boolean hasReleasedAllResources() {
 		String msg = String.format("Network buffer pool: %d missing memory segments. %d registered buffer pools. Connection manager: %d active connections. Task event dispatcher: %d registered writers.",
 				networkBufferPool.getTotalNumberOfMemorySegments() - networkBufferPool.getNumberOfAvailableMemorySegments(), networkBufferPool.getNumberOfRegisteredBufferPools(), connectionManager.getNumberOfActiveConnections(), taskEventDispatcher.getNumberOfRegisteredWriters());

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteAddress.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteAddress.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteAddress.java
deleted file mode 100644
index 937055b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteAddress.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.executiongraph.IntermediateResult;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * A {@link RemoteAddress} identifies a connection to a remote task manager by
- * the socket address and a connection index. This allows multiple connections
- * to be distinguished by their connection index.
- * <p>
- * The connection index is assigned by the {@link IntermediateResult} and
- * ensures that it is safe to multiplex multiple data transfers over the same
- * physical TCP connection.
- */
-public class RemoteAddress implements IOReadableWritable, Serializable {
-
-	private InetSocketAddress address;
-
-	private int connectionIndex;
-
-	public RemoteAddress(InstanceConnectionInfo connectionInfo, int connectionIndex) {
-		this(new InetSocketAddress(connectionInfo.address(), connectionInfo.dataPort()), connectionIndex);
-	}
-
-	public RemoteAddress(InetSocketAddress address, int connectionIndex) {
-		this.address = checkNotNull(address);
-		checkArgument(connectionIndex >= 0);
-		this.connectionIndex = connectionIndex;
-	}
-
-	public InetSocketAddress getAddress() {
-		return address;
-	}
-
-	public int getConnectionIndex() {
-		return connectionIndex;
-	}
-
-	@Override
-	public int hashCode() {
-		return address.hashCode() + (31 * connectionIndex);
-	}
-
-	@Override
-	public boolean equals(Object other) {
-		if (other.getClass() != RemoteAddress.class) {
-			return false;
-		}
-
-		final RemoteAddress ra = (RemoteAddress) other;
-		if (!ra.getAddress().equals(address) || ra.getConnectionIndex() != connectionIndex) {
-			return false;
-		}
-
-		return true;
-	}
-
-	@Override
-	public String toString() {
-		return address + " [" + connectionIndex + "]";
-	}
-
-	// ------------------------------------------------------------------------
-	// Serialization
-	// ------------------------------------------------------------------------
-
-	public RemoteAddress() {
-		this.address = null;
-		this.connectionIndex = -1;
-	}
-
-	@Override
-	public void write(final DataOutputView out) throws IOException {
-		final InetAddress ia = address.getAddress();
-		out.writeInt(ia.getAddress().length);
-		out.write(ia.getAddress());
-		out.writeInt(address.getPort());
-
-		out.writeInt(connectionIndex);
-	}
-
-	@Override
-	public void read(final DataInputView in) throws IOException {
-		final byte[] addressBytes = new byte[in.readInt()];
-		in.readFully(addressBytes);
-
-		final InetAddress ia = InetAddress.getByAddress(addressBytes);
-		int port = in.readInt();
-
-		address = new InetSocketAddress(ia, port);
-		connectionIndex = in.readInt();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
index 7a529b9..845f72a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
@@ -18,59 +18,49 @@
 
 package org.apache.flink.runtime.io.network;
 
-import com.google.common.collect.HashBasedTable;
-import com.google.common.collect.Table;
+import com.google.common.collect.Maps;
 import org.apache.flink.runtime.event.task.TaskEvent;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.util.event.EventListener;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Map;
 
 /**
- * The task event dispatcher dispatches events flowing backwards from a consumer
- * to a producer. It only supports programs, where the producer and consumer
- * are running at the same time.
- * <p>
- * The publish method is either called from the local input channel or the
- * network I/O thread.
+ * The task event dispatcher dispatches events flowing backwards from a consuming task to the task
+ * producing the consumed result.
+ *
+ * <p> Backwards events only work for tasks, which produce pipelined results, where both the
+ * producing and consuming task are running at the same time.
  */
 public class TaskEventDispatcher {
 
-	Table<ExecutionAttemptID, IntermediateResultPartitionID, BufferWriter> registeredWriters = HashBasedTable.create();
+	private final Map<ResultPartitionID, ResultPartitionWriter> registeredWriters = Maps.newHashMap();
 
-	public void registerWriterForIncomingTaskEvents(ExecutionAttemptID executionId, IntermediateResultPartitionID partitionId, BufferWriter listener) {
+	public void registerWriterForIncomingTaskEvents(ResultPartitionID partitionId, ResultPartitionWriter writer) {
 		synchronized (registeredWriters) {
-			if (registeredWriters.put(executionId, partitionId, listener) != null) {
-				throw new IllegalStateException("Event dispatcher already contains buffer writer.");
+			if (registeredWriters.put(partitionId, writer) != null) {
+				throw new IllegalStateException("Already registered at task event dispatcher.");
 			}
 		}
 	}
 
-	public void unregisterWriters(ExecutionAttemptID executionId) {
+	public void unregisterWriter(ResultPartitionWriter writer) {
 		synchronized (registeredWriters) {
-			List<IntermediateResultPartitionID> writersToUnregister = new ArrayList<IntermediateResultPartitionID>();
-
-			for (IntermediateResultPartitionID partitionId : registeredWriters.row(executionId).keySet()) {
-				writersToUnregister.add(partitionId);
-			}
-
-			for(IntermediateResultPartitionID partitionId : writersToUnregister) {
-				registeredWriters.remove(executionId, partitionId);
-			}
+			registeredWriters.remove(writer.getPartitionId());
 		}
 	}
 
 	/**
-	 * Publishes the event to the registered {@link EventListener} instance.
+	 * Publishes the event to the registered {@link ResultPartitionWriter} instances.
 	 * <p>
-	 * This method is either called from a local input channel or the network
-	 * I/O thread on behalf of a remote input channel.
+	 * This method is either called directly from a {@link LocalInputChannel} or the network I/O
+	 * thread on behalf of a {@link RemoteInputChannel}.
 	 */
-	public boolean publish(ExecutionAttemptID executionId, IntermediateResultPartitionID partitionId, TaskEvent event) {
-		EventListener<TaskEvent> listener = registeredWriters.get(executionId, partitionId);
+	public boolean publish(ResultPartitionID partitionId, TaskEvent event) {
+		EventListener<TaskEvent> listener = registeredWriters.get(partitionId);
 
 		if (listener != null) {
 			listener.onEvent(event);
@@ -80,6 +70,9 @@ public class TaskEventDispatcher {
 		return false;
 	}
 
+	/**
+	 * Returns the number of currently registered writers.
+	 */
 	int getNumberOfRegisteredWriters() {
 		synchronized (registeredWriters) {
 			return registeredWriters.size();

http://git-wip-us.apache.org/repos/asf/flink/blob/9d7acf36/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
index e70b6ee..4ee7fad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
@@ -21,10 +21,10 @@ package org.apache.flink.runtime.io.network.api.reader;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
+import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.io.network.serialization.SpillingAdaptiveSpanningRecordDeserializer;
 
 import java.io.IOException;
 
@@ -43,6 +43,7 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> extends Abstra
 
 	private boolean isFinished;
 
+	@SuppressWarnings("unchecked")
 	protected AbstractRecordReader(InputGate inputGate) {
 		super(inputGate);
 


Mime
View raw message