flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [3/3] incubator-flink git commit: [FLINK-1323] Refactor I/O Manager Readers and Writers to interfaces, add implementation that uses callbacks on completed write requests.
Date Tue, 11 Nov 2014 10:48:50 GMT
[FLINK-1323] Refactor I/O Manager Readers and Writers to interfaces, add implementation that uses callbacks on completed write requests.

 - This change also allows for a very simple way of plugging in a synchronous version of the I/O manager.

This closes #193.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/c9cfe3ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/c9cfe3ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/c9cfe3ba

Branch: refs/heads/master
Commit: c9cfe3ba9a2009c3da2cb8a39090154c30ccd88c
Parents: 8e4c772
Author: Stephan Ewen <sewen@apache.org>
Authored: Fri Nov 7 18:45:19 2014 +0100
Committer: uce <uce@apache.org>
Committed: Tue Nov 11 11:48:14 2014 +0100

----------------------------------------------------------------------
 .../io/disk/ChannelReaderInputViewIterator.java |   6 +-
 .../disk/iomanager/AbstractFileIOChannel.java   | 106 ++++
 .../disk/iomanager/AsynchronousBlockReader.java | 131 +++++
 .../disk/iomanager/AsynchronousBlockWriter.java |  88 +++
 .../AsynchronousBlockWriterWithCallback.java    |  67 +++
 .../iomanager/AsynchronousBulkBlockReader.java  | 107 ++++
 .../iomanager/AsynchronousFileIOChannel.java    | 264 +++++++++
 .../io/disk/iomanager/BlockChannelAccess.java   | 272 ---------
 .../io/disk/iomanager/BlockChannelReader.java   |  87 +--
 .../io/disk/iomanager/BlockChannelWriter.java   |  89 +--
 .../BlockChannelWriterWithCallback.java         |  35 ++
 .../disk/iomanager/BulkBlockChannelReader.java  |  59 +-
 .../runtime/io/disk/iomanager/Channel.java      | 109 ----
 .../io/disk/iomanager/ChannelAccess.java        | 172 ------
 .../disk/iomanager/ChannelReaderInputView.java  |   3 +-
 .../disk/iomanager/ChannelWriterOutputView.java |   3 +-
 .../io/disk/iomanager/FileIOChannel.java        | 156 ++++++
 .../runtime/io/disk/iomanager/IOManager.java    | 547 ++-----------------
 .../io/disk/iomanager/IOManagerAsync.java       | 449 +++++++++++++++
 .../runtime/io/disk/iomanager/IORequest.java    |  18 +-
 .../io/disk/iomanager/QueuingCallback.java      |  47 ++
 .../io/disk/iomanager/RequestDoneCallback.java  |  36 ++
 .../runtime/io/disk/iomanager/RequestQueue.java |  22 +-
 .../iomanager/SynchronousFileIOChannel.java     |  45 ++
 .../iterative/io/SerializedUpdateBuffer.java    |   4 +-
 .../runtime/operators/hash/HashPartition.java   |   8 +-
 .../operators/hash/MutableHashTable.java        |   4 +-
 .../operators/hash/ReOpenableHashPartition.java |   6 +-
 .../hash/ReOpenableMutableHashTable.java        |   4 +-
 .../sort/CombiningUnilateralSortMerger.java     |  22 +-
 .../operators/sort/UnilateralSortMerger.java    |  57 +-
 .../flink/runtime/taskmanager/TaskManager.java  |   3 +-
 .../runtime/util/EnvironmentInformation.java    |   8 +
 .../apache/flink/runtime/blob/BlobKeyTest.java  |   2 +-
 .../flink/runtime/io/disk/ChannelViewsTest.java |  18 +-
 .../runtime/io/disk/SpillingBufferTest.java     |   4 +-
 .../io/disk/iomanager/IOManagerITCase.java      |   4 +-
 .../IOManagerPerformanceBenchmark.java          |  17 +-
 .../io/disk/iomanager/IOManagerTest.java        |  65 +--
 .../flink/runtime/operators/CrossTaskTest.java  |   7 +-
 .../operators/MatchTaskExternalITCase.java      |   5 +-
 .../flink/runtime/operators/MatchTaskTest.java  |   5 +-
 .../operators/hash/HashMatchIteratorITCase.java |   8 +-
 .../runtime/operators/hash/HashTableITCase.java |   9 +-
 .../hash/HashTablePerformanceComparison.java    |   3 +-
 .../hash/ReOpenableHashTableITCase.java         |   5 +-
 .../SpillingResettableIteratorTest.java         |   3 +-
 ...lingResettableMutableObjectIteratorTest.java |   3 +-
 .../CombiningUnilateralSortMergerITCase.java    |   3 +-
 .../operators/sort/ExternalSortITCase.java      |   3 +-
 .../sort/MassiveStringSortingITCase.java        |   5 +-
 .../sort/SortMergeMatchIteratorITCase.java      |  11 +-
 .../operators/testutils/DriverTestBase.java     |   3 +-
 .../operators/testutils/MockEnvironment.java    |   3 +-
 .../operators/util/HashVsSortMiniBenchmark.java |   5 +-
 55 files changed, 1776 insertions(+), 1449 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java
index 7eaf635..f38aa25 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/ChannelReaderInputViewIterator.java
@@ -27,7 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.util.MutableObjectIterator;
@@ -46,14 +46,14 @@ public class ChannelReaderInputViewIterator<E> implements MutableObjectIterator<
 	private final List<MemorySegment> freeMemTarget;
 	
 	
-	public ChannelReaderInputViewIterator(IOManager ioAccess, Channel.ID channel, List<MemorySegment> segments,
+	public ChannelReaderInputViewIterator(IOManager ioAccess, FileIOChannel.ID channel, List<MemorySegment> segments,
 			List<MemorySegment> freeMemTarget, TypeSerializer<E> accessors, int numBlocks)
 	throws IOException
 	{
 		this(ioAccess, channel, new LinkedBlockingQueue<MemorySegment>(), segments, freeMemTarget, accessors, numBlocks);
 	}
 		
-	public ChannelReaderInputViewIterator(IOManager ioAccess, Channel.ID channel,  LinkedBlockingQueue<MemorySegment> returnQueue,
+	public ChannelReaderInputViewIterator(IOManager ioAccess, FileIOChannel.ID channel,  LinkedBlockingQueue<MemorySegment> returnQueue,
 			List<MemorySegment> segments, List<MemorySegment> freeMemTarget, TypeSerializer<E> accessors, int numBlocks)
 	throws IOException
 	{

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java
new file mode 100644
index 0000000..ecb794e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public abstract class AbstractFileIOChannel implements FileIOChannel {
+
+	/** Logger object for channel and its subclasses */
+	protected static final Logger LOG = LoggerFactory.getLogger(FileIOChannel.class);
+	
+	/** The ID of the underlying channel. */
+	protected final FileIOChannel.ID id;
+	
+	/** A file channel for NIO access to the file. */
+	protected final FileChannel fileChannel;
+	
+	
+	/**
+	 * Creates a new channel to the path indicated by the given ID. The channel hands IO requests to
+	 * the given request queue to be processed.
+	 * 
+	 * @param channelID The id describing the path of the file that the channel accessed.
+	 * @param writeEnabled Flag describing whether the channel should be opened in read/write mode, rather
+	 *                     than in read-only mode.
+	 * @throws IOException Thrown, if the channel could no be opened.
+	 */
+	protected AbstractFileIOChannel(FileIOChannel.ID channelID, boolean writeEnabled) throws IOException {
+		this.id = Preconditions.checkNotNull(channelID);
+		
+		try {
+			@SuppressWarnings("resource")
+			RandomAccessFile file = new RandomAccessFile(id.getPath(), writeEnabled ? "rw" : "r");
+			this.fileChannel = file.getChannel();
+		}
+		catch (IOException e) {
+			throw new IOException("Channel to path '" + channelID.getPath() + "' could not be opened.", e);
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Gets the channel ID of this channel.
+	 * 
+	 * @return This channel's ID.
+	 */
+	@Override
+	public final FileIOChannel.ID getChannelID() {
+		return this.id;
+	}
+	
+	@Override
+	public abstract boolean isClosed();
+	
+	@Override
+	public abstract void close() throws IOException;
+	
+	@Override
+	public void deleteChannel() {
+		if (!isClosed() || this.fileChannel.isOpen()) {
+			throw new IllegalStateException("Cannot delete a channel that is open.");
+		}
+	
+		// make a best effort to delete the file. Don't report exceptions.
+		try {
+			File f = new File(this.id.getPath());
+			if (f.exists()) {
+				f.delete();
+			}
+		} catch (Throwable t) {}
+	}
+	
+	@Override
+	public void closeAndDelete() throws IOException {
+		try {
+			close();
+		} finally {
+			deleteChannel();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
new file mode 100644
index 0000000..35273f4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+/**
+ * A reader that reads data in blocks from a file channel. The reader reads the blocks into a 
+ * {@link org.apache.flink.core.memory.MemorySegment} in an asynchronous fashion. That is, a read
+ * request is not processed by the thread that issues it, but by an asynchronous reader thread. Once the read request
+ * is done, the asynchronous reader adds the full MemorySegment to a <i>return queue</i> where it can be popped by the
+ * worker thread, once it needs the data. The return queue is in this case a
+ * {@link java.util.concurrent.LinkedBlockingQueue}, such that the working thread blocks until the request has been served,
+ * if the request is still pending when the it requires the data. 
+ * <p>
+ * Typical pre-fetching reads are done by issuing the read requests early and popping the return queue once the data
+ * is actually needed.
+ * <p>
+ * The reader has no notion whether the size of the memory segments is actually the size of the blocks on disk,
+ * or even whether the file was written in blocks of the same size, or in blocks at all. Ensuring that the
+ * writing and reading is consistent with each other (same blocks sizes) is up to the programmer.  
+ */
+public class AsynchronousBlockReader extends AsynchronousFileIOChannel<ReadRequest> implements BlockChannelReader {
+	
+	private final LinkedBlockingQueue<MemorySegment> returnSegments;
+	
+	/**
+	 * Creates a new block channel reader for the given channel.
+	 *  
+	 * @param channelID The ID of the channel to read.
+	 * @param requestQueue The request queue of the asynchronous reader thread, to which the I/O requests
+	 *                     are added.
+	 * @param returnSegments The return queue, to which the full Memory Segments are added.
+	 * @throws IOException Thrown, if the underlying file channel could not be opened.
+	 */
+	protected AsynchronousBlockReader(FileIOChannel.ID channelID, RequestQueue<ReadRequest> requestQueue,
+			LinkedBlockingQueue<MemorySegment> returnSegments)
+	throws IOException
+	{
+		super(channelID, requestQueue, new QueuingCallback(returnSegments), false);
+		this.returnSegments = returnSegments;
+	}	
+
+	/**
+	 * Issues a read request, which will asynchronously fill the given segment with the next block in the
+	 * underlying file channel. Once the read request is fulfilled, the segment will be added to this reader's
+	 * return queue.
+	 *  
+	 * @param segment The segment to read the block into.
+	 * @throws IOException Thrown, when the reader encounters an I/O error. Due to the asynchronous nature of the
+	 *                     reader, the exception thrown here may have been caused by an earlier read request. 
+	 */
+	@Override
+	public void readBlock(MemorySegment segment) throws IOException {
+		// check the error state of this channel
+		checkErroneous();
+		
+		// write the current buffer and get the next one
+		// the statements have to be in this order to avoid incrementing the counter
+		// after the channel has been closed
+		this.requestsNotReturned.incrementAndGet();
+		if (this.closed || this.requestQueue.isClosed()) {
+			// if we found ourselves closed after the counter increment,
+			// decrement the counter again and do not forward the request
+			this.requestsNotReturned.decrementAndGet();
+			throw new IOException("The reader has been closed.");
+		}
+		this.requestQueue.add(new SegmentReadRequest(this, segment));
+	}
+	
+	/**
+	 * Gets the next memory segment that has been filled with data by the reader. This method blocks until
+	 * such a segment is available, or until an error occurs in the reader, or the reader is closed.
+	 * <p>
+	 * WARNING: If this method is invoked without any segment ever returning (for example, because the
+	 * {@link #readBlock(MemorySegment)} method has not been invoked appropriately), the method may block
+	 * forever.
+	 * 
+	 * @return The next memory segment from the reader's return queue.
+	 * @throws IOException Thrown, if an I/O error occurs in the reader while waiting for the request to return.
+	 */
+	@Override
+	public MemorySegment getNextReturnedSegment() throws IOException {
+		try {
+			while (true) {
+				final MemorySegment next = this.returnSegments.poll(1000, TimeUnit.MILLISECONDS);
+				if (next != null) {
+					return next;
+				} else {
+					if (this.closed) {
+						throw new IOException("The reader has been asynchronously closed.");
+					}
+					checkErroneous();
+				}
+			}
+		} catch (InterruptedException iex) {
+			throw new IOException("Reader was interrupted while waiting for the next returning segment.");
+		}
+	}
+	
+	/**
+	 * Gets the queue in which the full memory segments are queued after the asynchronous read
+	 * is complete.
+	 * 
+	 * @return The queue with the full memory segments.
+	 */
+	@Override
+	public LinkedBlockingQueue<MemorySegment> getReturnQueue() {
+		return this.returnSegments;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriter.java
new file mode 100644
index 0000000..7e1681f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriter.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+public class AsynchronousBlockWriter extends AsynchronousBlockWriterWithCallback implements BlockChannelWriter {
+	
+	private final LinkedBlockingQueue<MemorySegment> returnSegments;
+	
+	/**
+	 * Creates a new block channel writer for the given channel.
+	 *  
+	 * @param channelID The ID of the channel to write to.
+	 * @param requestQueue The request queue of the asynchronous writer thread, to which the I/O requests
+	 *                     are added.
+	 * @param returnSegments The return queue, to which the processed Memory Segments are added.
+	 * @throws IOException Thrown, if the underlying file channel could not be opened exclusively.
+	 */
+	protected AsynchronousBlockWriter(FileIOChannel.ID channelID, RequestQueue<WriteRequest> requestQueue,
+			LinkedBlockingQueue<MemorySegment> returnSegments)
+	throws IOException
+	{
+		super(channelID, requestQueue, new QueuingCallback(returnSegments));
+		this.returnSegments = returnSegments;
+	}
+	
+	/**
+	 * Gets the next memory segment that has been written and is available again.
+	 * This method blocks until such a segment is available, or until an error occurs in the writer, or the
+	 * writer is closed.
+	 * <p>
+	 * NOTE: If this method is invoked without any segment ever returning (for example, because the
+	 * {@link #writeBlock(MemorySegment)} method has not been invoked accordingly), the method may block
+	 * forever.
+	 * 
+	 * @return The next memory segment from the writers's return queue.
+	 * @throws IOException Thrown, if an I/O error occurs in the writer while waiting for the request to return.
+	 */
+	@Override
+	public MemorySegment getNextReturnedSegment() throws IOException {
+		try {
+			while (true) {
+				final MemorySegment next = returnSegments.poll(1000, TimeUnit.MILLISECONDS);
+				if (next != null) {
+					return next;
+				} else {
+					if (this.closed) {
+						throw new IOException("The writer has been closed.");
+					}
+					checkErroneous();
+				}
+			}
+		} catch (InterruptedException e) {
+			throw new IOException("Writer was interrupted while waiting for the next returning segment.");
+		}
+	}
+	
+	/**
+	 * Gets the queue in which the memory segments are queued after the asynchronous write is completed.
+	 * 
+	 * @return The queue with the written memory segments.
+	 */
+	@Override
+	public LinkedBlockingQueue<MemorySegment> getReturnQueue() {
+		return this.returnSegments;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriterWithCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriterWithCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriterWithCallback.java
new file mode 100644
index 0000000..6b6fb36
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockWriterWithCallback.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+/**
+ * An asynchronous implementation of the {@link BlockChannelWriterWithCallback} that queues I/O requests
+ * and calls a callback once they have been handled.
+ */
+public class AsynchronousBlockWriterWithCallback extends AsynchronousFileIOChannel<WriteRequest> implements BlockChannelWriterWithCallback {
+	
+	/**
+	 * Creates a new asynchronous block writer for the given channel.
+	 *  
+	 * @param channelID The ID of the channel to write to.
+	 * @param requestQueue The request queue of the asynchronous writer thread, to which the I/O requests are added.
+	 * @param callback The callback to be invoked when requests are done.
+	 * @throws IOException Thrown, if the underlying file channel could not be opened exclusively.
+	 */
+	protected AsynchronousBlockWriterWithCallback(FileIOChannel.ID channelID, RequestQueue<WriteRequest> requestQueue,
+			RequestDoneCallback callback) throws IOException
+	{
+		super(channelID, requestQueue, callback, true);
+	}
+
+	/**
+	 * Issues a asynchronous write request to the writer.
+	 * 
+	 * @param segment The segment to be written.
+	 * @throws IOException Thrown, when the writer encounters an I/O error. Due to the asynchronous nature of the
+	 *                     writer, the exception thrown here may have been caused by an earlier write request. 
+	 */
+	@Override
+	public void writeBlock(MemorySegment segment) throws IOException {
+		// check the error state of this channel
+		checkErroneous();
+		
+		// write the current buffer and get the next one
+		this.requestsNotReturned.incrementAndGet();
+		if (this.closed || this.requestQueue.isClosed()) {
+			// if we found ourselves closed after the counter increment,
+			// decrement the counter again and do not forward the request
+			this.requestsNotReturned.decrementAndGet();
+			throw new IOException("The writer has been closed.");
+		}
+		this.requestQueue.add(new SegmentWriteRequest(this, segment));
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBulkBlockReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBulkBlockReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBulkBlockReader.java
new file mode 100644
index 0000000..048f82f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBulkBlockReader.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+/**
+ *
+ */
+public class AsynchronousBulkBlockReader extends AsynchronousFileIOChannel<ReadRequest> implements BulkBlockChannelReader {
+	
+	private final ArrayList<MemorySegment> returnBuffers;
+	
+	
+	protected AsynchronousBulkBlockReader(FileIOChannel.ID channelID, RequestQueue<ReadRequest> requestQueue, 
+			List<MemorySegment> sourceSegments, int numBlocks)
+	throws IOException
+	{
+		this (channelID, requestQueue, sourceSegments, numBlocks, new ArrayList<MemorySegment>(numBlocks));
+	}
+	
+	private AsynchronousBulkBlockReader(FileIOChannel.ID channelID, RequestQueue<ReadRequest> requestQueue, 
+			List<MemorySegment> sourceSegments, int numBlocks, ArrayList<MemorySegment> target)
+	throws IOException
+	{
+		super(channelID, requestQueue, new CollectingCallback(target), false);
+		this.returnBuffers = target;
+		
+		// sanity check
+		if (sourceSegments.size() < numBlocks) {
+			throw new IllegalArgumentException("The list of source memory segments must contain at least" +
+					" as many segments as the number of blocks to read.");
+		}
+		
+		// send read requests for all blocks
+		for (int i = 0; i < numBlocks; i++) {
+			readBlock(sourceSegments.remove(sourceSegments.size() - 1));
+		}
+	}
+	
+	private void readBlock(MemorySegment segment) throws IOException {
+		// check the error state of this channel
+		checkErroneous();
+		
+		// write the current buffer and get the next one
+		this.requestsNotReturned.incrementAndGet();
+		if (this.closed || this.requestQueue.isClosed()) {
+			// if we found ourselves closed after the counter increment,
+			// decrement the counter again and do not forward the request
+			this.requestsNotReturned.decrementAndGet();
+			throw new IOException("The reader has been closed.");
+		}
+		this.requestQueue.add(new SegmentReadRequest(this, segment));
+	}
+	
+	@Override
+	public List<MemorySegment> getFullSegments() {
+		synchronized (this.closeLock) {
+			if (!this.isClosed() || this.requestsNotReturned.get() > 0) {
+				throw new IllegalStateException("Full segments can only be obtained after the reader was properly closed.");
+			}
+		}
+		
+		return this.returnBuffers;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private static final class CollectingCallback implements RequestDoneCallback {
+		
+		private final ArrayList<MemorySegment> list;
+
+		public CollectingCallback(ArrayList<MemorySegment> list) {
+			this.list = list;
+		}
+		
+		@Override
+		public void requestSuccessful(MemorySegment buffer) {
+			list.add(buffer);
+		}
+		
+		@Override
+		public void requestFailed(MemorySegment buffer, IOException e) {
+			list.add(buffer);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
new file mode 100644
index 0000000..098b334
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+/**
+ * A base class for readers and writers that accept read or write requests for whole blocks.
+ * The request is delegated to an asynchronous I/O thread. After completion of the I/O request, the memory
+ * segment of the block is added to a collection to be returned.
+ * <p>
+ * The asynchrony of the access makes it possible to implement read-ahead or write-behind types of I/O accesses.
+ * 
+ * @param <R> The type of request (e.g. <tt>ReadRequest</tt> or <tt>WriteRequest</tt> issued by this access to the I/O threads.
+ */
+public abstract class AsynchronousFileIOChannel<R extends IORequest> extends AbstractFileIOChannel {
+	
+	/** The lock that is used during closing to synchronize the thread that waits for all
+	 * requests to be handled with the asynchronous I/O thread. */
+	protected final Object closeLock = new Object();
+	
+	/** A request queue for submitting asynchronous requests to the corresponding IO worker thread. */
+	protected final RequestQueue<R> requestQueue;
+	
+	/** An atomic integer that counts the number of requests that we still wait for to return. */
+	protected final AtomicInteger requestsNotReturned = new AtomicInteger(0);
+	
+	/** Hander for completed requests */
+	protected final RequestDoneCallback resultHander;
+	
+	/** An exception that was encountered by the asynchronous request handling thread.*/
+	protected volatile IOException exception;
+	
+	/** Flag marking this channel as closed */
+	protected volatile boolean closed;
+
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Creates a new channel access to the path indicated by the given ID. The channel accepts buffers to be
+	 * read/written and hands them to the asynchronous I/O thread. After being processed, the buffers 
+	 * are returned by adding the to the given queue.
+	 * 
+	 * @param channelID The id describing the path of the file that the channel accessed.
+	 * @param requestQueue The queue that this channel hands its IO requests to.
+	 * @param callback The callback to be invoked when a request is done.
+	 * @param writeEnabled Flag describing whether the channel should be opened in read/write mode, rather
+	 *                     than in read-only mode.
+	 * @throws IOException Thrown, if the channel could no be opened.
+	 */
+	protected AsynchronousFileIOChannel(FileIOChannel.ID channelID, RequestQueue<R> requestQueue, 
+			RequestDoneCallback callback, boolean writeEnabled) throws IOException
+	{
+		super(channelID, writeEnabled);
+		
+		if (requestQueue == null) {
+			throw new NullPointerException();
+		}
+		
+		this.requestQueue = requestQueue;
+		this.resultHander = callback;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public boolean isClosed() {
+		return this.closed;
+	}
+	
+	/**
+	 * Closes the reader and waits until all pending asynchronous requests are
+	 * handled. Even if an exception interrupts the closing, the underlying <tt>FileChannel</tt> is closed.
+	 * 
+	 * @throws IOException Thrown, if an I/O exception occurred while waiting for the buffers, or if
+	 *                     the closing was interrupted.
+	 */
+	public void close() throws IOException {
+		// atomically set the close flag
+		synchronized (this.closeLock) {
+			if (this.closed) {
+				return;
+			}
+			this.closed = true;
+			
+			try {
+				// wait until as many buffers have been returned as were written
+				// only then is everything guaranteed to be consistent.{
+				while (this.requestsNotReturned.get() > 0) {
+					try {
+						// we add a timeout here, because it is not guaranteed that the
+						// decrementing during buffer return and the check here are deadlock free.
+						// the deadlock situation is however unlikely and caught by the timeout
+						this.closeLock.wait(1000);
+						checkErroneous();
+					}
+					catch (InterruptedException iex) {}
+				}
+			}
+			finally {
+				// close the file
+				if (this.fileChannel.isOpen()) {
+					this.fileChannel.close();
+				}
+			}
+		}
+	}
+	
+	/**
+	 * This method waits for all pending asynchronous requests to return. When the
+	 * last request has returned, the channel is closed and deleted.
+	 * <p>
+	 * Even if an exception interrupts the closing, such that not all request are handled,
+	 * the underlying <tt>FileChannel</tt> is closed and deleted.
+	 * 
+	 * @throws IOException Thrown, if an I/O exception occurred while waiting for the buffers, or if the closing was interrupted.
+	 */
+	public void closeAndDelete() throws IOException {
+		try {
+			close();
+		}
+		finally {
+			deleteChannel();
+		}
+	}
+	
+	/**
+	 * Checks the exception state of this channel. The channel is erroneous, if one of its requests could not
+	 * be processed correctly.
+	 * 
+	 * @throws IOException Thrown, if the channel is erroneous. The thrown exception contains the original exception
+	 *                     that defined the erroneous state as its cause.
+	 */
+	public final void checkErroneous() throws IOException {
+		if (this.exception != null) {
+			throw this.exception;
+		}
+	}
+	
+	/**
+	 * Handles a processed <tt>Buffer</tt>. This method is invoked by the
+	 * asynchronous IO worker threads upon completion of the IO request with the
+	 * provided buffer and/or an exception that occurred while processing the request
+	 * for that buffer.
+	 * 
+	 * @param buffer The buffer to be processed.
+	 * @param ex The exception that occurred in the I/O threads when processing the buffer's request.
+	 */
+	final void handleProcessedBuffer(MemorySegment buffer, IOException ex) {
+		// even if the callbacks throw an error, we need to maintain our bookkeeping
+		try {
+			if (ex != null && this.exception == null) {
+				this.exception = ex;
+				this.resultHander.requestFailed(buffer, ex);
+			}
+			else {
+				this.resultHander.requestSuccessful(buffer);
+			}
+		}
+		finally {
+			// decrement the number of missing buffers. If we are currently closing, notify the 
+			if (this.closed) {
+				synchronized (this.closeLock) {
+					int num = this.requestsNotReturned.decrementAndGet();
+					if (num == 0) {
+						this.closeLock.notifyAll();
+					}
+				}
+			}
+			else {
+				this.requestsNotReturned.decrementAndGet();
+			}
+		}
+	}
+}
+
+//--------------------------------------------------------------------------------------------
+
+/**
+ * Read request that reads an entire memory segment from a block reader.
+ */
+final class SegmentReadRequest implements ReadRequest {
+	
+	private final AsynchronousFileIOChannel<ReadRequest> channel;
+	
+	private final MemorySegment segment;
+	
+	protected SegmentReadRequest(AsynchronousFileIOChannel<ReadRequest> targetChannel, MemorySegment segment) {
+		this.channel = targetChannel;
+		this.segment = segment;
+	}
+
+	@Override
+	public void read() throws IOException {
+		final FileChannel c = this.channel.fileChannel;
+		if (c.size() - c.position() > 0) {
+			try {
+				final ByteBuffer wrapper = this.segment.wrap(0, this.segment.size());
+				this.channel.fileChannel.read(wrapper);
+			}
+			catch (NullPointerException npex) {
+				throw new IOException("Memory segment has been released.");
+			}
+		}
+	}
+
+	@Override
+	public void requestDone(IOException ioex) {
+		this.channel.handleProcessedBuffer(this.segment, ioex);
+	}
+}
+
+//--------------------------------------------------------------------------------------------
+
+/**
+ * Write request that writes an entire memory segment to the block writer.
+ */
+final class SegmentWriteRequest implements WriteRequest {
+	
+	private final AsynchronousFileIOChannel<WriteRequest> channel;
+	
+	private final MemorySegment segment;
+	
+	protected SegmentWriteRequest(AsynchronousFileIOChannel<WriteRequest> targetChannel, MemorySegment segment) {
+		this.channel = targetChannel;
+		this.segment = segment;
+	}
+
+	@Override
+	public void write() throws IOException {
+		try {
+			this.channel.fileChannel.write(this.segment.wrap(0, this.segment.size()));
+		}
+		catch (NullPointerException npex) {
+			throw new IOException("Memory segment has been released.");
+		}
+	}
+
+	@Override
+	public void requestDone(IOException ioex) {
+		this.channel.handleProcessedBuffer(this.segment, ioex);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelAccess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelAccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelAccess.java
deleted file mode 100644
index f19586d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelAccess.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.disk.iomanager;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.flink.core.memory.MemorySegment;
-
-
-/**
- * A base class for readers and writers that accept read or write requests for whole blocks.
- * The request is delegated to an asynchronous I/O thread. After completion of the I/O request, the memory
- * segment of the block is added to a collection to be returned.
- * <p>
- * The asynchrony of the access makes it possible to implement read-ahead or write-behind types of I/O accesses.
- * 
- * 
- * @param <R> The type of request (e.g. <tt>ReadRequest</tt> or <tt>WriteRequest</tt> issued by this access to
- *            the I/O threads.
- * @param <C> The type of collection used to collect the segments from completed requests. Those segments are for
- *            example for write requests the written and reusable segments, and for read requests the now full
- *            and usable segments. The collection type may for example be a synchronized queue or an unsynchronized
- *            list. 
- */
-public abstract class BlockChannelAccess<R extends IORequest, C extends Collection<MemorySegment>> extends ChannelAccess<MemorySegment, R>
-{	
-	/**
-	 * The lock that is used during closing to synchronize the thread that waits for all
-	 * requests to be handled with the asynchronous I/O thread.
-	 */
-	protected final Object closeLock = new Object();
-	
-	/**
-	 * An atomic integer that counts the number of buffers we still wait for to return.
-	 */
-	protected final AtomicInteger requestsNotReturned = new AtomicInteger(0);
-	
-	/**
-	 * The collection gathering the processed buffers that are ready to be (re)used.
-	 */
-	protected final C returnBuffers;
-	
-	/**
-	 * Flag marking this channel as closed;
-	 */
-	protected volatile boolean closed;
-
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Creates a new channel access to the path indicated by the given ID. The channel accepts buffers to be
-	 * read/written and hands them to the asynchronous I/O thread. After being processed, the buffers 
-	 * are returned by adding the to the given queue.
-	 * 
-	 * @param channelID The id describing the path of the file that the channel accessed.
-	 * @param requestQueue The queue that this channel hands its IO requests to.
-	 * @param returnQueue The queue to which the segments are added after their buffer was written.
-	 * @param writeEnabled Flag describing whether the channel should be opened in read/write mode, rather
-	 *                     than in read-only mode.
-	 * @throws IOException Thrown, if the channel could no be opened.
-	 */
-	protected BlockChannelAccess(Channel.ID channelID, RequestQueue<R> requestQueue,
-			C returnQueue, boolean writeEnabled)
-	throws IOException
-	{
-		super(channelID, requestQueue, writeEnabled);
-		
-		if (requestQueue == null) {
-			throw new NullPointerException();
-		}
-		
-		this.returnBuffers = returnQueue;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Gets the queue (or list) to which the asynchronous reader adds its elements.
-	 * 
-	 * @return The queue (or list) to which the asynchronous reader adds its elements.
-	 */
-	public C getReturnQueue()
-	{
-		return this.returnBuffers;
-	}
-	
-
-	@Override
-	public boolean isClosed()
-	{
-		return this.closed;
-	}
-	
-	/**
-	 * Closes the reader and waits until all pending asynchronous requests are
-	 * handled. Even if an exception interrupts the closing, the underlying <tt>FileChannel</tt> is
-	 * closed.
-	 * 
-	 * @throws IOException Thrown, if an I/O exception occurred while waiting for the buffers, or if
-	 *                     the closing was interrupted.
-	 */
-	public void close() throws IOException
-	{
-		// atomically set the close flag
-		synchronized (this.closeLock) {
-			if (this.closed) {
-				return;
-			}
-			this.closed = true;
-			
-			try {
-				// wait until as many buffers have been returned as were written
-				// only then is everything guaranteed to be consistent.{
-				while (this.requestsNotReturned.get() > 0) {
-					try {
-						// we add a timeout here, because it is not guaranteed that the
-						// decrementing during buffer return and the check here are deadlock free.
-						// the deadlock situation is however unlikely and caught by the timeout
-						this.closeLock.wait(1000);
-						checkErroneous();
-					}
-					catch (InterruptedException iex) {}
-				}
-			}
-			finally {
-				// close the file
-				if (this.fileChannel.isOpen()) {
-					this.fileChannel.close();
-				}
-			}
-		}
-	}
-	
-	/**
-	 * This method waits for all pending asynchronous requests to return. When the
-	 * last request has returned, the channel is closed and deleted.
-	 * 
-	 * Even if an exception interrupts the closing, such that not all request are handled,
-	 * the underlying <tt>FileChannel</tt> is closed and deleted.
-	 * 
-	 * @throws IOException Thrown, if an I/O exception occurred while waiting for the buffers, or if
-	 *                     the closing was interrupted.
-	 */
-	public void closeAndDelete() throws IOException
-	{
-		try {
-			close();
-		}
-		finally {
-			deleteChannel();
-		}
-	}
-	
-
-	@Override
-	protected void returnBuffer(MemorySegment buffer)
-	{
-		this.returnBuffers.add(buffer);
-		
-		// decrement the number of missing buffers. If we are currently closing, notify the 
-		if (this.closed) {
-			synchronized (this.closeLock) {
-				int num = this.requestsNotReturned.decrementAndGet();
-				if (num == 0) {
-					this.closeLock.notifyAll();
-				}
-			}
-		}
-		else {
-			this.requestsNotReturned.decrementAndGet();
-		}
-	}
-}
-
-//--------------------------------------------------------------------------------------------
-
-/**
- * Special read request that reads an entire memory segment from a block reader.
- */
-final class SegmentReadRequest implements ReadRequest
-{
-	private final BlockChannelAccess<ReadRequest, ?> channel;
-	
-	private final MemorySegment segment;
-	
-	protected SegmentReadRequest(BlockChannelAccess<ReadRequest, ?> targetChannel, MemorySegment segment)
-	{
-		this.channel = targetChannel;
-		this.segment = segment;
-	}
-
-
-	@Override
-	public void read() throws IOException
-	{
-		final FileChannel c = this.channel.fileChannel;
-		if (c.size() - c.position() > 0) {
-			try {
-				final ByteBuffer wrapper = this.segment.wrap(0, this.segment.size());
-				this.channel.fileChannel.read(wrapper);
-			} catch (NullPointerException npex) {
-				// the memory has been cleared asynchronouosly through task failing or canceling
-				// ignore the request, since the result cannot be read
-			}
-		}
-	}
-
-
-	@Override
-	public void requestDone(IOException ioex)
-	{
-		this.channel.handleProcessedBuffer(this.segment, ioex);
-	}
-}
-
-//--------------------------------------------------------------------------------------------
-
-/**
- * Special write request that writes an entire memory segment to the block writer.
- */
-final class SegmentWriteRequest implements WriteRequest
-{
-	private final BlockChannelAccess<WriteRequest, ?> channel;
-	
-	private final MemorySegment segment;
-	
-	protected SegmentWriteRequest(BlockChannelAccess<WriteRequest, ?> targetChannel, MemorySegment segment)
-	{
-		this.channel = targetChannel;
-		this.segment = segment;
-	}
-
-
-	@Override
-	public void write() throws IOException
-	{
-		try {
-			this.channel.fileChannel.write(this.segment.wrap(0, this.segment.size()));
-		} catch (NullPointerException npex) {
-			// the memory has been cleared asynchronouosly through task failing or canceling
-			// ignore the request, since there is nothing to write.
-		}
-	}
-
-
-	@Override
-	public void requestDone(IOException ioex)
-	{
-		this.channel.handleProcessedBuffer(this.segment, ioex);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
index f674ad4..f25827a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
@@ -16,76 +16,30 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.disk.iomanager;
 
 import java.io.IOException;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.flink.core.memory.MemorySegment;
 
-
 /**
  * A reader that reads data in blocks from a file channel. The reader reads the blocks into a 
- * {@link org.apache.flink.core.memory.MemorySegment} in an asynchronous fashion. That is, a read
- * request is not processed by the thread that issues it, but by an asynchronous reader thread. Once the read request
- * is done, the asynchronous reader adds the full MemorySegment to a <i>return queue</i> where it can be popped by the
- * worker thread, once it needs the data. The return queue is in this case a
- * {@link java.util.concurrent.LinkedBlockingQueue}, such that the working thread blocks until the request has been served,
- * if the request is still pending when the it requires the data. 
- * <p>
- * Typical pre-fetching reads are done by issuing the read requests early and popping the return queue once the data
- * is actually needed.
- * <p>
- * The reader has no notion whether the size of the memory segments is actually the size of the blocks on disk,
- * or even whether the file was written in blocks of the same size, or in blocks at all. Ensuring that the
- * writing and reading is consistent with each other (same blocks sizes) is up to the programmer.  
+ * {@link org.apache.flink.core.memory.MemorySegment}. To support asynchronous implementations,
+ * the read method does not immediately return the full memory segment, but rather adds it to
+ * a blocking queue of finished read operations.
  */
-public class BlockChannelReader extends BlockChannelAccess<ReadRequest, LinkedBlockingQueue<MemorySegment>>
-{
-	/**
-	 * Creates a new block channel reader for the given channel.
-	 *  
-	 * @param channelID The ID of the channel to read.
-	 * @param requestQueue The request queue of the asynchronous reader thread, to which the I/O requests
-	 *                     are added.
-	 * @param returnSegments The return queue, to which the full Memory Segments are added.
-	 * @throws IOException Thrown, if the underlying file channel could not be opened.
-	 */
-	protected BlockChannelReader(Channel.ID channelID, RequestQueue<ReadRequest> requestQueue,
-			LinkedBlockingQueue<MemorySegment> returnSegments, int numRequestsToBundle)
-	throws IOException
-	{
-		super(channelID, requestQueue, returnSegments, false);
-	}	
+public interface BlockChannelReader extends FileIOChannel {
 
 	/**
-	 * Issues a read request, which will asynchronously fill the given segment with the next block in the
+	 * Issues a read request, which will fill the given segment with the next block in the
 	 * underlying file channel. Once the read request is fulfilled, the segment will be added to this reader's
 	 * return queue.
 	 *  
 	 * @param segment The segment to read the block into.
-	 * @throws IOException Thrown, when the reader encounters an I/O error. Due to the asynchronous nature of the
-	 *                     reader, the exception thrown here may have been caused by an earlier read request. 
+	 * @throws IOException Thrown, when the reader encounters an I/O error.
 	 */
-	public void readBlock(MemorySegment segment) throws IOException
-	{
-		// check the error state of this channel
-		checkErroneous();
-		
-		// write the current buffer and get the next one
-		// the statements have to be in this order to avoid incrementing the counter
-		// after the channel has been closed
-		this.requestsNotReturned.incrementAndGet();
-		if (this.closed || this.requestQueue.isClosed()) {
-			// if we found ourselves closed after the counter increment,
-			// decrement the counter again and do not forward the request
-			this.requestsNotReturned.decrementAndGet();
-			throw new IOException("The reader has been closed.");
-		}
-		this.requestQueue.add(new SegmentReadRequest(this, segment));
-	}
+	void readBlock(MemorySegment segment) throws IOException;
 	
 	/**
 	 * Gets the next memory segment that has been filled with data by the reader. This method blocks until
@@ -98,22 +52,13 @@ public class BlockChannelReader extends BlockChannelAccess<ReadRequest, LinkedBl
 	 * @return The next memory segment from the reader's return queue.
 	 * @throws IOException Thrown, if an I/O error occurs in the reader while waiting for the request to return.
 	 */
-	public MemorySegment getNextReturnedSegment() throws IOException
-	{
-		try {
-			while (true) {
-				final MemorySegment next = this.returnBuffers.poll(2000, TimeUnit.MILLISECONDS);
-				if (next != null) {
-					return next;
-				} else {
-					if (this.closed) {
-						throw new IOException("The reader has been asynchronously closed.");
-					}
-					checkErroneous();
-				}
-			}
-		} catch (InterruptedException iex) {
-			throw new IOException("Reader was interrupted while waiting for the next returning segment.");
-		}
-	}
+	public MemorySegment getNextReturnedSegment() throws IOException;
+	
+	/**
+	 * Gets the queue in which the full memory segments are queued after the read is complete.
+	 * 
+	 * @return The queue with the full memory segments.
+	 */
+	LinkedBlockingQueue<MemorySegment> getReturnQueue();
 }
+	
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriter.java
index 44a2edb..25c74e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriter.java
@@ -16,101 +16,40 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.disk.iomanager;
 
-
 import java.io.IOException;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.flink.core.memory.MemorySegment;
 
-
 /**
  * A writer that writes data in blocks to a file channel. The writer receives the data blocks in the form of 
  * {@link org.apache.flink.core.memory.MemorySegment}, which it writes entirely to the channel,
- * regardless of how space in the segment is used. The writing happens in an asynchronous fashion. That is, a write
- * request is not processed by the thread that issues it, but by an asynchronous writer thread. Once the request
- * is done, the asynchronous writer adds the MemorySegment to a <i>return queue</i> where it can be popped by the
- * worker thread, to be reused. The return queue is in this case a
- * {@link java.util.concurrent.LinkedBlockingQueue}, such that the working thread blocks until the request has been served,
- * if the request is still pending when the it requires the segment back. 
- * <p>
- * Typical write behind is realized, by having a small set of segments in the return queue at all times. When a
- * memory segment must be written, the request is issued to the writer and a new segment is immediately popped from
- * the return queue. Once too many requests have been issued and the I/O thread cannot keep up, the working thread
- * naturally blocks until another segment is available again.
+ * regardless of how space in the segment is used. The writing may be realized synchronously, or asynchronously,
+ * depending on the implementation.
  */
-public class BlockChannelWriter extends BlockChannelAccess<WriteRequest, LinkedBlockingQueue<MemorySegment>>
-{
-	/**
-	 * Creates a new block channel writer for the given channel.
-	 *  
-	 * @param channelID The ID of the channel to write to.
-	 * @param requestQueue The request queue of the asynchronous writer thread, to which the I/O requests
-	 *                     are added.
-	 * @param returnSegments The return queue, to which the processed Memory Segments are added.
-	 * @throws IOException Thrown, if the underlying file channel could not be opened exclusively.
-	 */
-	protected BlockChannelWriter(Channel.ID channelID, RequestQueue<WriteRequest> requestQueue,
-			LinkedBlockingQueue<MemorySegment> returnSegments, int numRequestsToBundle)
-	throws IOException
-	{
-		super(channelID, requestQueue, returnSegments, true);
-	}
-
-	/**
-	 * Issues a asynchronous write request to the writer.
-	 * 
-	 * @param segment The segment to be written.
-	 * @throws IOException Thrown, when the writer encounters an I/O error. Due to the asynchronous nature of the
-	 *                     writer, the exception thrown here may have been caused by an earlier write request. 
-	 */
-	public void writeBlock(MemorySegment segment) throws IOException
-	{
-		// check the error state of this channel
-		checkErroneous();
-		
-		// write the current buffer and get the next one
-		this.requestsNotReturned.incrementAndGet();
-		if (this.closed || this.requestQueue.isClosed()) {
-			// if we found ourselves closed after the counter increment,
-			// decrement the counter again and do not forward the request
-			this.requestsNotReturned.decrementAndGet();
-			throw new IOException("The writer has been closed.");
-		}
-		this.requestQueue.add(new SegmentWriteRequest(this, segment));
-	}
+public interface BlockChannelWriter extends BlockChannelWriterWithCallback {
 	
 	/**
 	 * Gets the next memory segment that has been written and is available again.
 	 * This method blocks until such a segment is available, or until an error occurs in the writer, or the
 	 * writer is closed.
 	 * <p>
-	 * WARNING: If this method is invoked without any segment ever returning (for example, because the
-	 * {@link #writeBlock(MemorySegment)} method has not been invoked appropriately), the method may block
+	 * NOTE: If this method is invoked without any segment ever returning (for example, because the
+	 * {@link #writeBlock(MemorySegment)} method has not been invoked accordingly), the method may block
 	 * forever.
 	 * 
 	 * @return The next memory segment from the writers's return queue.
 	 * @throws IOException Thrown, if an I/O error occurs in the writer while waiting for the request to return.
 	 */
-	public MemorySegment getNextReturnedSegment() throws IOException
-	{
-		try {
-			while (true) {
-				final MemorySegment next = this.returnBuffers.poll(2000, TimeUnit.MILLISECONDS);
-				if (next != null) {
-					return next;
-				} else {
-					if (this.closed) {
-						throw new IOException("The writer has been closed.");
-					}
-					checkErroneous();
-				}
-			}
-		} catch (InterruptedException iex) {
-			throw new IOException("Writer was interrupted while waiting for the next returning segment.");
-		}
-	}
+	MemorySegment getNextReturnedSegment() throws IOException;
+	
+	/**
+	 * Gets the queue in which the memory segments are queued after the asynchronous write
+	 * is completed
+	 * 
+	 * @return The queue with the written memory segments.
+	 */
+	LinkedBlockingQueue<MemorySegment> getReturnQueue();
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
new file mode 100644
index 0000000..57bc7e0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelWriterWithCallback.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+public interface BlockChannelWriterWithCallback extends FileIOChannel {
+	
+	/**
+	 * Writes the given memory segment. The request may be executed synchronously, or asynchronously, depending
+	 * on the implementation.
+	 * 
+	 * @param segment The segment to be written.
+	 * @throws IOException Thrown, when the writer encounters an I/O error.
+	 */
+	void writeBlock(MemorySegment segment) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BulkBlockChannelReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BulkBlockChannelReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BulkBlockChannelReader.java
index 3be85d1..84883e7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BulkBlockChannelReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BulkBlockChannelReader.java
@@ -16,71 +16,16 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.disk.iomanager;
 
-import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.core.memory.MemorySegment;
 
-
 /**
  *
- *
  */
-public class BulkBlockChannelReader extends BlockChannelAccess<ReadRequest, ArrayList<MemorySegment>>
-{
+public interface BulkBlockChannelReader extends FileIOChannel {
 	
-	
-	protected BulkBlockChannelReader(Channel.ID channelID, RequestQueue<ReadRequest> requestQueue, 
-			List<MemorySegment> sourceSegments, int numBlocks)
-	throws IOException
-	{
-		super(channelID, requestQueue, new ArrayList<MemorySegment>(numBlocks), false);
-		
-		// sanity check
-		if (sourceSegments.size() < numBlocks) {
-			throw new IllegalArgumentException("The list of source memory segments must contain at least" +
-					" as many segments as the number of blocks to read.");
-		}
-		
-		// send read requests for all blocks
-		for (int i = 0; i < numBlocks; i++) {
-			readBlock(sourceSegments.remove(sourceSegments.size() - 1));
-		}
-	}
-	
-
-	
-	private void readBlock(MemorySegment segment) throws IOException
-	{
-		// check the error state of this channel
-		checkErroneous();
-		
-		// write the current buffer and get the next one
-		this.requestsNotReturned.incrementAndGet();
-		if (this.closed || this.requestQueue.isClosed()) {
-			// if we found ourselves closed after the counter increment,
-			// decrement the counter again and do not forward the request
-			this.requestsNotReturned.decrementAndGet();
-			throw new IOException("The reader has been closed.");
-		}
-		this.requestQueue.add(new SegmentReadRequest(this, segment));
-	}
-	
-	public List<MemorySegment> getFullSegments()
-	{
-		synchronized (this.closeLock) {
-			if (!this.isClosed() || this.requestsNotReturned.get() > 0) {
-				throw new IllegalStateException("Full segments can only be obtained after the reader was properly closed.");
-			}
-		}
-		
-		return this.returnBuffers;
-	}
-
+	List<MemorySegment> getFullSegments();
 }
-
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/Channel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/Channel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/Channel.java
deleted file mode 100644
index 7e64e79..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/Channel.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.disk.iomanager;
-
-import java.io.File;
-import java.util.Random;
-
-import org.apache.flink.util.StringUtils;
-
-/**
- * A Channel represents a collection of files that belong logically to the same resource. An example is a collection of
- * files that contain sorted runs of data from the same stream, that will later on be merged together.
- * 
- */
-public final class Channel
-{
-	private static final int RANDOM_BYTES_LENGTH = 16;
-
-	/**
-	 * An ID identifying an underlying fileChannel.
-	 * 
-	 */
-	public static class ID
-	{
-		private final String path;
-		
-		private final int threadNum;
-
-		protected ID(final String path, final int threadNum) {
-			this.path = path;
-			this.threadNum = threadNum;
-		}
-
-		protected ID(final String basePath, final int threadNum, final Random random)
-		{
-			this.path = basePath + File.separator + randomString(random) + ".channel";
-			this.threadNum = threadNum;
-		}
-
-		/**
-		 * Returns the path to the underlying temporary file.
-		 */
-		public String getPath() {
-			return path;
-		}
-		
-		int getThreadNum() {
-			return this.threadNum;
-		}
-
-		public String toString() {
-			return path;
-		}
-	}
-
-	public static final class Enumerator
-	{
-		private static final String FORMAT = "%s%s%s.%06d.channel";
-
-		private final String[] paths;
-		
-		private final String namePrefix;
-
-		private int counter;
-
-		protected Enumerator(final String[] basePaths, final Random random)
-		{
-			this.paths = basePaths;
-			this.namePrefix = randomString(random);
-			this.counter = 0;
-		}
-
-		public ID next()
-		{
-			final int threadNum = counter % paths.length;
-			return new ID(String.format(FORMAT, this.paths[threadNum], File.separator, namePrefix, (counter++)), threadNum);
-		}
-	}
-
-	/**
-	 * Creates a random byte sequence using the provided {@code random} generator and returns its hex representation.
-	 * 
-	 * @param random
-	 *        The random number generator to be used.
-	 * @return A hex representation of the generated byte sequence
-	 */
-	private static final String randomString(final Random random) {
-		final byte[] bytes = new byte[RANDOM_BYTES_LENGTH];
-		random.nextBytes(bytes);
-		return StringUtils.byteToHexString(bytes);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelAccess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelAccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelAccess.java
deleted file mode 100644
index 2b5b34d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelAccess.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.disk.iomanager;
-
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
-
-
-/**
- * A base class for readers and writers that read data from I/O manager channels, or write data to them.
- * Requests handled by channels that inherit from this class are executed asynchronously, which allows
- * write-behind for writers and pre-fetching for readers.
- * 
- * 
- * @param <T> The buffer type used for the underlying IO operations.
- */
-public abstract class ChannelAccess<T, R extends IORequest>
-{
-	/**
-	 * The ID of the underlying channel.
-	 */
-	protected final Channel.ID id;
-
-	/**
-	 * A file channel for NIO access to the file.
-	 */
-	protected final FileChannel fileChannel;
-	
-	/**
-	 * A request queue for submitting asynchronous requests to the corresponding
-	 * IO worker thread.
-	 */
-	protected final RequestQueue<R> requestQueue;
-	
-	/**
-	 * An exception that was encountered by the asynchronous request handling thread.
-	 */
-	protected volatile IOException exception;
-	
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Creates a new channel to the path indicated by the given ID. The channel hands IO requests to
-	 * the given request queue to be processed.
-	 * 
-	 * @param channelID The id describing the path of the file that the channel accessed.
-	 * @param requestQueue The queue that this channel hands its IO requests to.
-	 * @param writeEnabled Flag describing whether the channel should be opened in read/write mode, rather
-	 *                     than in read-only mode.
-	 * @throws IOException Thrown, if the channel could no be opened.
-	 */
-	protected ChannelAccess(Channel.ID channelID, RequestQueue<R> requestQueue, boolean writeEnabled)
-	throws IOException
-	{
-		if (channelID == null || requestQueue == null) {
-			throw new NullPointerException();
-		}
-		
-		this.id = channelID;
-		this.requestQueue = requestQueue;
-		
-		try {
-			@SuppressWarnings("resource")
-			RandomAccessFile file = new RandomAccessFile(id.getPath(), writeEnabled ? "rw" : "r");
-			this.fileChannel = file.getChannel();
-		}
-		catch (IOException e) {
-			throw new IOException("Channel to path '" + channelID.getPath() + "' could not be opened.", e);
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Checks, whether this channel has been closed;
-	 * 
-	 * @return True, if the channel has been closed, false otherwise.
-	 */
-	public abstract boolean isClosed();
-	
-	/**
-	 * This method is invoked by the asynchronous I/O thread to return a buffer after the I/O request
-	 * completed.
-	 * 
-	 * @param buffer The buffer to be returned.
-	 */
-	protected abstract void returnBuffer(T buffer); 
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Gets the channel ID of this channel.
-	 * 
-	 * @return This channel's ID.
-	 */
-	public final Channel.ID getChannelID()
-	{
-		return this.id;
-	}
-	
-	/**
-	 * Checks the exception state of this channel. The channel is erroneous, if one of its requests could not
-	 * be processed correctly.
-	 * 
-	 * @throws IOException Thrown, if the channel is erroneous. The thrown exception contains the original exception
-	 *                     that defined the erroneous state as its cause.
-	 */
-	public final void checkErroneous() throws IOException
-	{
-		if (this.exception != null) {
-			throw new IOException("The channel is erroneous.", this.exception);
-		}
-	}
-	
-	/**
-	 * Deletes this channel by physically removing the file beneath it.
-	 * This method may only be called on a closed channel.
-	 */
-	public void deleteChannel()
-	{
-		if (this.fileChannel.isOpen()) {
-			throw new IllegalStateException("Cannot delete a channel that is open.");
-		}
-	
-		// make a best effort to delete the file. Don't report exceptions.
-		try {
-			File f = new File(this.id.getPath());
-			if (f.exists()) {
-				f.delete();
-			}
-		} catch (Throwable t) {}
-	}
-	
-	/**
-	 * Handles a processed <tt>Buffer</tt>. This method is invoked by the
-	 * asynchronous IO worker threads upon completion of the IO request with the
-	 * provided buffer and/or an exception that occurred while processing the request
-	 * for that buffer.
-	 * 
-	 * @param buffer The buffer to be processed.
-	 * @param ex The exception that occurred in the I/O threads when processing the buffer's request.
-	 */
-	final void handleProcessedBuffer(T buffer, IOException ex) {
-		
-		if (ex != null && this.exception == null) {
-			this.exception = ex;
-		}
-		
-		returnBuffer(buffer);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
index 25aa289..d85ec82 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelReaderInputView.java
@@ -164,8 +164,7 @@ public class ChannelReaderInputView extends AbstractPagedInputView {
 	 * @return A list containing all memory segments originally supplied to this view.
 	 * @throws IOException Thrown, if the underlying reader could not be properly closed.
 	 */
-	public List<MemorySegment> close() throws IOException
-	{	
+	public List<MemorySegment> close() throws IOException {	
 		if (this.closed) {
 			throw new IllegalStateException("Already closed.");
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
index f230333..9824d34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/ChannelWriterOutputView.java
@@ -81,8 +81,7 @@ public final class ChannelWriterOutputView extends AbstractPagedOutputView {
 	 * @param memory The memory used to buffer data, or null, to utilize solely the return queue.
 	 * @param segmentSize The size of the memory segments.
 	 */
-	public ChannelWriterOutputView(BlockChannelWriter writer, List<MemorySegment> memory, int segmentSize)
-	{
+	public ChannelWriterOutputView(BlockChannelWriter writer, List<MemorySegment> memory, int segmentSize) {
 		super(segmentSize, HEADER_LENGTH);
 		
 		if (writer == null) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
new file mode 100644
index 0000000..7c9d31b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.util.StringUtils;
+
+/**
+ * A Channel represents a collection of files that belong logically to the same resource. An example is a collection of
+ * files that contain sorted runs of data from the same stream, that will later on be merged together.
+ */
+public interface FileIOChannel {
+	
+	/**
+	 * Gets the channel ID of this I/O channel.
+	 * 
+	 * @return The channel ID.
+	 */
+	FileIOChannel.ID getChannelID();
+	
+	/**
+	 * Checks whether the channel has been closed.
+	 * 
+	 * @return True if the channel has been closed, false otherwise.
+	 */
+	boolean isClosed();
+
+	/**
+	* Closes the channel. For asynchronous implementations, this method waits until all pending requests are
+	* handled. Even if an exception interrupts the closing, the underlying <tt>FileChannel</tt> is closed.
+	* 
+	* @throws IOException Thrown, if an error occurred while waiting for pending requests.
+	*/
+	void close() throws IOException;
+
+	/**
+	 * Deletes the file underlying this I/O channel.
+	 *  
+	 * @throws IllegalStateException Thrown, when the channel is still open.
+	 */
+	void deleteChannel();
+	
+	/**
+	* Closes the channel and deletes the underlying file.
+	* For asynchronous implementations, this method waits until all pending requests are handled;
+	* 
+	* @throws IOException Thrown, if an error occurred while waiting for pending requests.
+	*/
+	public void closeAndDelete() throws IOException;
+	
+	// --------------------------------------------------------------------------------------------
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * An ID identifying an underlying file channel.
+	 */
+	public static class ID {
+		
+		private static final int RANDOM_BYTES_LENGTH = 16;
+		
+		private final String path;
+		
+		private final int threadNum;
+
+		protected ID(String path, int threadNum) {
+			this.path = path;
+			this.threadNum = threadNum;
+		}
+
+		protected ID(String basePath, int threadNum, Random random) {
+			this.path = basePath + File.separator + randomString(random) + ".channel";
+			this.threadNum = threadNum;
+		}
+
+		/**
+		 * Returns the path to the underlying temporary file.
+		 */
+		public String getPath() {
+			return path;
+		}
+		
+		int getThreadNum() {
+			return this.threadNum;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj instanceof ID) {
+				ID other = (ID) obj;
+				return this.path.equals(other.path) && this.threadNum == other.threadNum;
+			} else {
+				return false;
+			}
+		}
+		
+		@Override
+		public int hashCode() {
+			return path.hashCode();
+		}
+		
+		@Override
+		public String toString() {
+			return path;
+		}
+		
+		private static final String randomString(final Random random) {
+			final byte[] bytes = new byte[RANDOM_BYTES_LENGTH];
+			random.nextBytes(bytes);
+			return StringUtils.byteToHexString(bytes);
+		}
+	}
+
+	/**
+	 * An enumerator for channels that logically belong together.
+	 */
+	public static final class Enumerator {
+		
+		private static final String FORMAT = "%s%s%s.%06d.channel";
+
+		private final String[] paths;
+		
+		private final String namePrefix;
+
+		private int counter;
+
+		protected Enumerator(String[] basePaths, Random random) {
+			this.paths = basePaths;
+			this.namePrefix = ID.randomString(random);
+			this.counter = 0;
+		}
+
+		public ID next() {
+			final int threadNum = counter % paths.length;
+			return new ID(String.format(FORMAT, this.paths[threadNum], File.separator, namePrefix, (counter++)), threadNum);
+		}
+	}
+}


Mime
View raw message