flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [2/3] incubator-flink git commit: [FLINK-1323] Refactor I/O Manager Readers and Writers to interfaces, add implementation that uses callbacks on completed write requests.
Date Tue, 11 Nov 2014 10:48:49 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index 388773d..f77d9c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -16,11 +16,9 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.disk.iomanager;
 
 import java.io.IOException;
-import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -32,194 +30,71 @@ import org.apache.flink.core.memory.MemorySegment;
 /**
  * The facade for the provided I/O manager services.
  */
-public class IOManager implements UncaughtExceptionHandler {
+public abstract class IOManager {
 	
 	/** Logging */
-	private static final Logger LOG = LoggerFactory.getLogger(IOManager.class);
+	protected static final Logger LOG = LoggerFactory.getLogger(IOManager.class);
 
-	/**
-	 * The default temp paths for anonymous Channels.
-	 */
+	/** The temporary directories for files */
 	private final String[] paths;
 
-	/**
-	 * A random number generator for the anonymous ChannelIDs.
-	 */
+	/** A random number generator for the anonymous ChannelIDs. */
 	private final Random random;
-
-	/**
-	 * The writer thread used for asynchronous block oriented channel writing.
-	 */
-	private final WriterThread[] writers;
-
-	/**
-	 * The reader threads used for asynchronous block oriented channel reading.
-	 */
-	private final ReaderThread[] readers;
 	
-	/**
-	 * The number of the next path to use.
-	 */
+	/** The number of the next path to use. */
 	private volatile int nextPath;
-
-	/**
-	 * A boolean flag indicating whether the close() has already been invoked.
-	 */
-	private volatile boolean isClosed = false;
-
 	
 	// -------------------------------------------------------------------------
 	//               Constructors / Destructors
 	// -------------------------------------------------------------------------
 
 	/**
-	 * Constructs a new IOManager, writing channels to the system directory.
-	 */
-	public IOManager() {
-		this(System.getProperty("java.io.tmpdir"));
-	}
-	
-	/**
-	 * Constructs a new IOManager.
-	 * 
-	 * @param tempDir The base directory path for files underlying channels.
-	 */
-	public IOManager(String tempDir) {
-		this(new String[] {tempDir});
-	}
-
-	/**
 	 * Constructs a new IOManager.
 	 * 
 	 * @param paths
 	 *        the basic directory paths for files underlying anonymous channels.
 	 */
-	public IOManager(String[] paths) {
+	protected IOManager(String[] paths) {
 		this.paths = paths;
 		this.random = new Random();
 		this.nextPath = 0;
-		
-		// start a write worker thread for each directory
-		this.writers = new WriterThread[paths.length];
-		for (int i = 0; i < this.writers.length; i++) {
-			final WriterThread t = new WriterThread();
-			this.writers[i] = t;
-			t.setName("IOManager writer thread #" + (i + 1));
-			t.setDaemon(true);
-			t.setUncaughtExceptionHandler(this);
-			t.start();
-		}
-
-		// start a reader worker thread for each directory
-		this.readers = new ReaderThread[paths.length];
-		for (int i = 0; i < this.readers.length; i++) {
-			final ReaderThread t = new ReaderThread();
-			this.readers[i] = t;
-			t.setName("IOManager reader thread #" + (i + 1));
-			t.setDaemon(true);
-			t.setUncaughtExceptionHandler(this);
-			t.start();
-		}
 	}
 
 	/**
-	 * Close method. Shuts down the reader and writer threads immediately, not waiting for their
-	 * pending requests to be served. This method waits until the threads have actually ceased their
-	 * operation.
+	 * Close method, marks the I/O manager as closed.
 	 */
-	public synchronized final void shutdown()
-	{
-		if (!this.isClosed) {
-			this.isClosed = true;
-
-			// close writing and reading threads with best effort and log problems
-			
-			// --------------------------------- writer shutdown ----------------------------------			
-			for (int i = 0; i < this.readers.length; i++) {
-				try {
-					this.writers[i].shutdown();
-				}
-				catch (Throwable t) {
-					LOG.error("Error while shutting down IO Manager writer thread.", t);
-				}
-			}
-
-			// --------------------------------- reader shutdown ----------------------------------
-			for (int i = 0; i < this.readers.length; i++) {
-				try {
-					this.readers[i].shutdown();
-				}
-				catch (Throwable t) {
-					LOG.error("Error while shutting down IO Manager reader thread.", t);
-				}
-			}
-			
-			// ------------------------ wait until shutdown is complete ---------------------------
-			try {
-				for (int i = 0; i < this.readers.length; i++) {
-					this.writers[i].join();
-				}
-				for (int i = 0; i < this.readers.length; i++) {
-					this.readers[i].join();
-				}
-			}
-			catch (InterruptedException iex) {}
-		}
-	}
+	public abstract void shutdown();
 	
 	/**
-	 * Utility method to check whether the IO manager has been properly shut down. The IO manager is considered
-	 * to be properly shut down when it is closed and its threads have ceased operation.
+	 * Utility method to check whether the IO manager has been properly shut down.
 	 * 
 	 * @return True, if the IO manager has properly shut down, false otherwise.
 	 */
-	public final boolean isProperlyShutDown()
-	{
-		boolean readersShutDown = true;
-		for (int i = 0; i < this.readers.length; i++) {
-			readersShutDown &= this.readers[i].getState() == Thread.State.TERMINATED;
-		}
-		
-		boolean writersShutDown = true;
-		for (int i = 0; i < this.writers.length; i++) {
-			readersShutDown &= this.writers[i].getState() == Thread.State.TERMINATED;
-		}
-		
-		return this.isClosed && writersShutDown && readersShutDown;
-	}
-
-
-	@Override
-	public void uncaughtException(Thread t, Throwable e) {
-		LOG.error("IO Thread '" + t.getName() + "' terminated due to an exception. Closing I/O Manager.", e);
-		shutdown();	
-	}
+	public abstract boolean isProperlyShutDown();
 
 	// ------------------------------------------------------------------------
 	//                          Channel Instantiations
 	// ------------------------------------------------------------------------
 	
 	/**
-	 * Creates a new {@link Channel.ID} in one of the temp directories. Multiple
+	 * Creates a new {@link FileIOChannel.ID} in one of the temp directories. Multiple
 	 * invocations of this method spread the channels evenly across the different directories.
 	 * 
 	 * @return A channel to a temporary directory.
 	 */
-	public Channel.ID createChannel()
-	{
+	public FileIOChannel.ID createChannel() {
 		final int num = getNextPathNum();
-		return new Channel.ID(this.paths[num], num, this.random);
+		return new FileIOChannel.ID(this.paths[num], num, this.random);
 	}
 
 	/**
-	 * Creates a new {@link Channel.Enumerator}, spreading the channels in a round-robin fashion
+	 * Creates a new {@link FileIOChannel.Enumerator}, spreading the channels in a round-robin fashion
 	 * across the temporary file directories.
 	 * 
 	 * @return An enumerator for channels.
 	 */
-	public Channel.Enumerator createChannelEnumerator()
-	{
-		return new Channel.Enumerator(this.paths, this.random);
+	public FileIOChannel.Enumerator createChannelEnumerator() {
+		return new FileIOChannel.Enumerator(this.paths, this.random);
 	}
 
 	
@@ -228,199 +103,65 @@ public class IOManager implements UncaughtExceptionHandler {
 	// ------------------------------------------------------------------------
 	
 	/**
-	 * Creates a block channel writer that writes to the given channel. The writer writes asynchronously (write-behind),
-	 * accepting write request, carrying them out at some time and returning the written segment to the given queue
-	 * afterwards.
+	 * Creates a block channel writer that writes to the given channel. The writer adds the
+	 * written segment to its return-queue afterwards (to allow for asynchronous implementations).
 	 * 
 	 * @param channelID The descriptor for the channel to write to.
-	 * @param returnQueue The queue to put the written buffers into.
 	 * @return A block channel writer that writes to the given channel.
 	 * @throws IOException Thrown, if the channel for the writer could not be opened.
 	 */
-	public BlockChannelWriter createBlockChannelWriter(Channel.ID channelID,
-								LinkedBlockingQueue<MemorySegment> returnQueue)
-	throws IOException
-	{
-		if (this.isClosed) {
-			throw new IllegalStateException("I/O-Manger is closed.");
-		}
-		
-		return new BlockChannelWriter(channelID, this.writers[channelID.getThreadNum()].requestQueue, returnQueue, 1);
+	public BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID channelID) throws IOException {
+		return createBlockChannelWriter(channelID, new LinkedBlockingQueue<MemorySegment>());
 	}
 	
 	/**
-	 * Creates a block channel writer that writes to the given channel. The writer writes asynchronously (write-behind),
-	 * accepting write request, carrying them out at some time and returning the written segment to the given queue
-	 * afterwards.
-	 * <p>
-	 * The writer will collect a specified number of write requests and carry them out
-	 * in one, effectively writing one block in the size of multiple memory pages.
-	 * Note that this means that no memory segment will reach the return queue before
-	 * the given number of requests are collected, so the number of buffers used with
-	 * the writer should be greater than the number of requests to combine. Ideally,
-	 * the number of memory segments used is a multiple of the number of requests to
-	 * combine.
+	 * Creates a block channel writer that writes to the given channel. The writer adds the
+	 * written segment to the given queue (to allow for asynchronous implementations).
 	 * 
 	 * @param channelID The descriptor for the channel to write to.
 	 * @param returnQueue The queue to put the written buffers into.
-	 * @param numRequestsToCombine The number of write requests to combine to one I/O request.
 	 * @return A block channel writer that writes to the given channel.
 	 * @throws IOException Thrown, if the channel for the writer could not be opened.
 	 */
-	public BlockChannelWriter createBlockChannelWriter(Channel.ID channelID,
-								LinkedBlockingQueue<MemorySegment> returnQueue, int numRequestsToCombine)
-	throws IOException
-	{
-		if (this.isClosed) {
-			throw new IllegalStateException("I/O-Manger is closed.");
-		}
-		
-		return new BlockChannelWriter(channelID, this.writers[channelID.getThreadNum()].requestQueue, returnQueue, numRequestsToCombine);
-	}
+	public abstract BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID channelID,
+				LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException;
 	
 	/**
-	 * Creates a block channel writer that writes to the given channel. The writer writes asynchronously (write-behind),
-	 * accepting write request, carrying them out at some time and returning the written segment its return queue afterwards.
+	 * Creates a block channel writer that writes to the given channel. The writer calls the given callback
+	 * after the I/O operation has been performed (successfully or unsuccessfully), to allow
+	 * for asynchronous implementations.
 	 * 
 	 * @param channelID The descriptor for the channel to write to.
+	 * @param callback The callback to be called for 
 	 * @return A block channel writer that writes to the given channel.
 	 * @throws IOException Thrown, if the channel for the writer could not be opened.
 	 */
-	public BlockChannelWriter createBlockChannelWriter(Channel.ID channelID)
-	throws IOException
-	{
-		if (this.isClosed) {
-			throw new IllegalStateException("I/O-Manger is closed.");
-		}
-		
-		return new BlockChannelWriter(channelID, this.writers[channelID.getThreadNum()].requestQueue, new LinkedBlockingQueue<MemorySegment>(), 1);
-	}
+	public abstract BlockChannelWriterWithCallback createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback callback) throws IOException;
 	
 	/**
-	 * Creates a block channel writer that writes to the given channel. The writer writes asynchronously (write-behind),
-	 * accepting write request, carrying them out at some time and returning the written segment its return queue afterwards.
-	 * <p>
-	 * The writer will collect a specified number of write requests and carry them out
-	 * in one, effectively writing one block in the size of multiple memory pages.
-	 * Note that this means that no memory segment will reach the return queue before
-	 * the given number of requests are collected, so the number of buffers used with
-	 * the writer should be greater than the number of requests to combine. Ideally,
-	 * the number of memory segments used is a multiple of the number of requests to
-	 * combine.
+	 * Creates a block channel reader that reads blocks from the given channel. The reader pushed
+	 * full memory segments (with the read data) to its "return queue", to allow for asynchronous read
+	 * implementations.
 	 * 
 	 * @param channelID The descriptor for the channel to write to.
-	 * @param numRequestsToCombine The number of write requests to combine to one I/O request.
-	 * @return A block channel writer that writes to the given channel.
-	 * @throws IOException Thrown, if the channel for the writer could not be opened.
-	 */
-	public BlockChannelWriter createBlockChannelWriter(Channel.ID channelID, int numRequestsToCombine)
-	throws IOException
-	{
-		if (this.isClosed) {
-			throw new IllegalStateException("I/O-Manger is closed.");
-		}
-		
-		return new BlockChannelWriter(channelID, this.writers[channelID.getThreadNum()].requestQueue, new LinkedBlockingQueue<MemorySegment>(), numRequestsToCombine);
-	}
-	
-	/**
-	 * Creates a block channel reader that reads blocks from the given channel. The reader reads asynchronously,
-	 * such that a read request is accepted, carried out at some (close) point in time, and the full segment
-	 * is pushed to the given queue.
-	 * 
-	 * @param channelID The descriptor for the channel to write to.
-	 * @param returnQueue The queue to put the full buffers into.
 	 * @return A block channel reader that reads from the given channel.
 	 * @throws IOException Thrown, if the channel for the reader could not be opened.
 	 */
-	public BlockChannelReader createBlockChannelReader(Channel.ID channelID,
-										LinkedBlockingQueue<MemorySegment> returnQueue)
-	throws IOException
-	{
-		if (this.isClosed) {
-			throw new IllegalStateException("I/O-Manger is closed.");
-		}
-		
-		return new BlockChannelReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, returnQueue, 1);
+	public BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID) throws IOException {
+		return createBlockChannelReader(channelID, new LinkedBlockingQueue<MemorySegment>());
 	}
 	
 	/**
-	 * Creates a block channel reader that reads blocks from the given channel. The reader reads asynchronously,
-	 * such that a read request is accepted, carried out at some (close) point in time, and the full segment
-	 * is pushed to the given queue.
-	 * <p>
-	 * The reader will collect a specified number of read requests and carry them out
-	 * in one, effectively reading one block in the size of multiple memory pages.
-	 * Note that this means that no memory segment will reach the return queue before
-	 * the given number of requests are collected, so the number of buffers used with
-	 * the reader should be greater than the number of requests to combine. Ideally,
-	 * the number of memory segments used is a multiple of the number of requests to
-	 * combine.
+	 * Creates a block channel reader that reads blocks from the given channel. The reader pushes the full segments
+	 * to the given queue, to allow for asynchronous implementations.
 	 * 
 	 * @param channelID The descriptor for the channel to write to.
 	 * @param returnQueue The queue to put the full buffers into.
-	 * @param numRequestsToCombine The number of read requests to combine to one I/O request.
-	 * @return A block channel reader that reads from the given channel.
-	 * @throws IOException Thrown, if the channel for the reader could not be opened.
-	 */
-	public BlockChannelReader createBlockChannelReader(Channel.ID channelID,
-					LinkedBlockingQueue<MemorySegment> returnQueue, int numRequestsToCombine)
-	throws IOException
-	{
-		if (this.isClosed) {
-			throw new IllegalStateException("I/O-Manger is closed.");
-		}
-		
-		return new BlockChannelReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, returnQueue, numRequestsToCombine);
-	}
-	
-	/**
-	 * Creates a block channel reader that reads blocks from the given channel. The reader reads asynchronously,
-	 * such that a read request is accepted, carried out at some (close) point in time, and the full segment
-	 * is pushed to the reader's return queue.
-	 * 
-	 * @param channelID The descriptor for the channel to write to.
 	 * @return A block channel reader that reads from the given channel.
 	 * @throws IOException Thrown, if the channel for the reader could not be opened.
 	 */
-	public BlockChannelReader createBlockChannelReader(Channel.ID channelID)
-	throws IOException
-	{
-		if (this.isClosed) {
-			throw new IllegalStateException("I/O-Manger is closed.");
-		}
-		
-		return new BlockChannelReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, new LinkedBlockingQueue<MemorySegment>(), 1);
-	}
-	
-	/**
-	 * Creates a block channel reader that reads blocks from the given channel. The reader reads asynchronously,
-	 * such that a read request is accepted, carried out at some (close) point in time, and the full segment
-	 * is pushed to the reader's return queue.
-	 * <p>
-	 * The reader will collect a specified number of read requests and carry them out
-	 * in one, effectively reading one block in the size of multiple memory pages.
-	 * Note that this means that no memory segment will reach the return queue before
-	 * the given number of requests are collected, so the number of buffers used with
-	 * the reader should be greater than the number of requests to combine. Ideally,
-	 * the number of memory segments used is a multiple of the number of requests to
-	 * combine.
-	 * 
-	 * @param channelID The descriptor for the channel to write to.
-	 * @param numRequestsToCombine The number of write requests to combine to one I/O request.
-	 * @return A block channel reader that reads from the given channel.
-	 * @throws IOException Thrown, if the channel for the reader could not be opened.
-	 */
-	public BlockChannelReader createBlockChannelReader(Channel.ID channelID, int numRequestsToCombine)
-	throws IOException
-	{
-		if (this.isClosed) {
-			throw new IllegalStateException("I/O-Manger is closed.");
-		}
-		
-		return new BlockChannelReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, 
-			new LinkedBlockingQueue<MemorySegment>(), numRequestsToCombine);
-	}
+	public abstract BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID, 
+										LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException;
 	
 	/**
 	 * Creates a block channel reader that reads all blocks from the given channel directly in one bulk.
@@ -437,217 +178,17 @@ public class IOManager implements UncaughtExceptionHandler {
 	 * @return A block channel reader that reads from the given channel.
 	 * @throws IOException Thrown, if the channel for the reader could not be opened.
 	 */
-	public BulkBlockChannelReader createBulkBlockChannelReader(Channel.ID channelID,
-			List<MemorySegment> targetSegments,	int numBlocks)
-	throws IOException
-	{
-		if (this.isClosed) {
-			throw new IllegalStateException("I/O-Manger is closed.");
-		}
-		
-		return new BulkBlockChannelReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, targetSegments, numBlocks);
-	}
+	public abstract BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID, 
+			List<MemorySegment> targetSegments, int numBlocks) throws IOException;
 	
-	// ========================================================================
-	//                             Utilities
-	// ========================================================================
+	// ------------------------------------------------------------------------
+	//                          Utilities
+	// ------------------------------------------------------------------------
 	
-	private final int getNextPathNum()
-	{
+	protected int getNextPathNum() {
 		final int next = this.nextPath;
 		final int newNext = next + 1;
 		this.nextPath = newNext >= this.paths.length ? 0 : newNext;
 		return next;
 	}
-	
-	
-	// ========================================================================
-	//                          I/O Worker Threads
-	// ========================================================================
-
-	/**
-	 * A worker thread for asynchronous read.
-	 * 
-	 */
-	private static final class ReaderThread extends Thread
-	{
-		protected final RequestQueue<ReadRequest> requestQueue;
-
-		private volatile boolean alive;
-
-		// ---------------------------------------------------------------------
-		// Constructors / Destructors
-		// ---------------------------------------------------------------------
-		
-		protected ReaderThread()
-		{
-			this.requestQueue = new RequestQueue<ReadRequest>();
-			this.alive = true;
-		}
-		
-		/**
-		 * Shuts the thread down. This operation does not wait for all pending requests to be served, halts the thread
-		 * immediately. All buffers of pending requests are handed back to their channel readers and an exception is
-		 * reported to them, declaring their request queue as closed.
-		 */
-		protected void shutdown()
-		{
-			if (this.alive) {
-				// shut down the thread
-				try {
-					this.alive = false;
-					this.requestQueue.close();
-					this.interrupt();
-				}
-				catch (Throwable t) {}
-			}
-			
-			// notify all pending write requests that the thread has been shut down
-			IOException ioex = new IOException("IO-Manager has been closed.");
-			
-			while (!this.requestQueue.isEmpty()) {
-				ReadRequest request = this.requestQueue.poll();
-				request.requestDone(ioex);
-			}
-		}
-
-		// ---------------------------------------------------------------------
-		//                             Main loop
-		// ---------------------------------------------------------------------
-
-		@Override
-		public void run()
-		{
-			while (this.alive)
-			{
-				
-				// get the next buffer. ignore interrupts that are not due to a shutdown.
-				ReadRequest request = null;
-				while (request == null) {
-					try {
-						request = this.requestQueue.take();
-					}
-					catch (InterruptedException iex) {
-						if (!this.alive) {
-							// exit
-							return;
-						}
-					}
-				}
-				
-				// remember any IO exception that occurs, so it can be reported to the writer
-				IOException ioex = null;
-
-				try {
-					// read buffer from the specified channel
-					request.read();
-				}
-				catch (IOException e) {
-					ioex = e;
-				}
-				catch (Throwable t) {
-					ioex = new IOException("The buffer could not be read: " + t.getMessage(), t);
-					IOManager.LOG.error("I/O reading thread encountered an error" + 
-						t.getMessage() == null ? "." : ": ", t);
-				}
-
-				// invoke the processed buffer handler of the request issuing reader object
-				request.requestDone(ioex);
-			} // end while alive
-		}
-		
-	} // end reading thread
-	
-	/**
-	 * A worker thread that asynchronously writes the buffers to disk.
-	 */
-	private static final class WriterThread extends Thread
-	{
-		protected final RequestQueue<WriteRequest> requestQueue;
-
-		private volatile boolean alive;
-
-		// ---------------------------------------------------------------------
-		// Constructors / Destructors
-		// ---------------------------------------------------------------------
-
-		protected WriterThread()
-		{
-			this.requestQueue = new RequestQueue<WriteRequest>();
-			this.alive = true;
-		}
-
-		/**
-		 * Shuts the thread down. This operation does not wait for all pending requests to be served, halts the thread
-		 * immediately. All buffers of pending requests are handed back to their channel writers and an exception is
-		 * reported to them, declaring their request queue as closed.
-		 */
-		protected void shutdown()
-		{
-			if (this.alive) {
-				// shut down the thread
-				try {
-					this.alive = false;
-					this.requestQueue.close();
-					this.interrupt();
-				}
-				catch (Throwable t) {}
-				
-				// notify all pending write requests that the thread has been shut down
-				IOException ioex = new IOException("Writer thread has been closed.");
-				
-				while (!this.requestQueue.isEmpty())
-				{
-					WriteRequest request = this.requestQueue.poll();
-					request.requestDone(ioex);
-				}
-			}
-		}
-
-		// ---------------------------------------------------------------------
-		// Main loop
-		// ---------------------------------------------------------------------
-
-		@Override
-		public void run()
-		{
-			while (this.alive) {
-				
-				WriteRequest request = null;
-				
-				// get the next buffer. ignore interrupts that are not due to a shutdown.
-				while (request == null) {
-					try {
-						request = requestQueue.take();
-					}
-					catch (InterruptedException iex) {
-						if (!this.alive) {
-							// exit
-							return;
-						}
-					}
-				}
-				
-				// remember any IO exception that occurs, so it can be reported to the writer
-				IOException ioex = null;
-				
-				try {
-					// write buffer to the specified channel
-					request.write();
-				}
-				catch (IOException e) {
-					ioex = e;
-				}
-				catch (Throwable t) {
-					ioex = new IOException("The buffer could not be written: " + t.getMessage(), t);
-					IOManager.LOG.error("I/O reading thread encountered an error" + 
-						t.getMessage() == null ? "." : ": ", t);
-				}
-
-				// invoke the processed buffer handler of the request issuing writer object
-				request.requestDone(ioex);
-			} // end while alive
-		}
-		
-	}; // end writer thread
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
new file mode 100644
index 0000000..1f79067
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A version of the {@link IOManager} that uses asynchronous I/O.
+ */
+public class IOManagerAsync extends IOManager implements UncaughtExceptionHandler {
+	
+	/** The writer threads used for asynchronous block oriented channel writing. */
+	private final WriterThread[] writers;
+
+	/** The reader threads used for asynchronous block oriented channel reading. */
+	private final ReaderThread[] readers;
+	
+	/** Lock object to guard shutdown */
+	private final Object shutdownLock = new Object();
+	
+	/** Flag to mark the I/O manager as alive or shut down */
+	private volatile boolean shutdown;
+	
+	// -------------------------------------------------------------------------
+	//               Constructors / Destructors
+	// -------------------------------------------------------------------------
+
+	/**
+	 * Constructs a new asynchronous I/O manger, writing files to the system 's temp directory.
+	 */
+	public IOManagerAsync() {
+		this(EnvironmentInformation.getTemporaryFileDirectory());
+	}
+	
+	/**
+	 * Constructs a new asynchronous I/O manger, writing file to the given directory.
+	 * 
+	 * @param tempDir The directory to write temporary files to.
+	 */
+	public IOManagerAsync(String tempDir) {
+		this(new String[] {tempDir});
+	}
+
+	/**
+	 * Constructs a new asynchronous I/O manger, writing file round robin across the given directories.
+	 * 
+	 * @param tempDirs The directories to write temporary files to.
+	 */
+	public IOManagerAsync(String[] tempDirs) {
+		super(tempDirs);
+		
+		// start a write worker thread for each directory
+		this.writers = new WriterThread[tempDirs.length];
+		for (int i = 0; i < this.writers.length; i++) {
+			final WriterThread t = new WriterThread();
+			this.writers[i] = t;
+			t.setName("IOManager writer thread #" + (i + 1));
+			t.setDaemon(true);
+			t.setUncaughtExceptionHandler(this);
+			t.start();
+		}
+
+		// start a reader worker thread for each directory
+		this.readers = new ReaderThread[tempDirs.length];
+		for (int i = 0; i < this.readers.length; i++) {
+			final ReaderThread t = new ReaderThread();
+			this.readers[i] = t;
+			t.setName("IOManager reader thread #" + (i + 1));
+			t.setDaemon(true);
+			t.setUncaughtExceptionHandler(this);
+			t.start();
+		}
+	}
+
+	/**
+	 * Close method. Shuts down the reader and writer threads immediately, not waiting for their
+	 * pending requests to be served. This method waits until the threads have actually ceased their
+	 * operation.
+	 */
+	@Override
+	public void shutdown() {
+		synchronized (shutdownLock) {
+			if (shutdown) {
+				return;
+			}
+			
+			shutdown = true;
+			
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Shutting down I/O manager.");
+			}
+
+			// close writing and reading threads with best effort and log problems
+			// first notify all to close, then wait until all are closed
+
+			for (WriterThread wt : writers) {
+				try {
+					wt.shutdown();
+				}
+				catch (Throwable t) {
+					LOG.error("Error while shutting down IO Manager writer thread.", t);
+				}
+			}
+			for (ReaderThread rt : readers) {
+				try {
+					rt.shutdown();
+				}
+				catch (Throwable t) {
+					LOG.error("Error while shutting down IO Manager reader thread.", t);
+				}
+			}
+			try {
+				for (WriterThread wt : writers) {
+					wt.join();
+				}
+				for (ReaderThread rt : readers) {
+					rt.join();
+				}
+			}
+			catch (InterruptedException iex) {}
+		}
+	}
+	
+	/**
+	 * Utility method to check whether the IO manager has been properly shut down. The IO manager is considered
+	 * to be properly shut down when it is closed and its threads have ceased operation.
+	 * 
+	 * @return True, if the IO manager has properly shut down, false otherwise.
+	 */
+	@Override
+	public boolean isProperlyShutDown() {
+		boolean readersShutDown = true;
+		for (ReaderThread rt : readers) {
+			readersShutDown &= rt.getState() == Thread.State.TERMINATED;
+		}
+		
+		boolean writersShutDown = true;
+		for (WriterThread wt : writers) {
+			readersShutDown &= wt.getState() == Thread.State.TERMINATED;
+		}
+		
+		return shutdown && writersShutDown && readersShutDown;
+	}
+
+
+	@Override
+	public void uncaughtException(Thread t, Throwable e) {
+		LOG.error("IO Thread '" + t.getName() + "' terminated due to an exception. Closing I/O Manager.", e);
+		shutdown();	
+	}
+	
+	// ------------------------------------------------------------------------
+	//                        Reader / Writer instantiations
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID channelID,
+								LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException
+	{
+		Preconditions.checkState(!shutdown, "I/O-Manger is closed.");
+		return new AsynchronousBlockWriter(channelID, this.writers[channelID.getThreadNum()].requestQueue, returnQueue);
+	}
+	
+	@Override
+	public BlockChannelWriterWithCallback createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback callback) throws IOException {
+		Preconditions.checkState(!shutdown, "I/O-Manger is closed.");
+		return new AsynchronousBlockWriterWithCallback(channelID, this.writers[channelID.getThreadNum()].requestQueue, callback);
+	}
+	
+	/**
+	 * Creates a block channel reader that reads blocks from the given channel. The reader reads asynchronously,
+	 * such that a read request is accepted, carried out at some (close) point in time, and the full segment
+	 * is pushed to the given queue.
+	 * 
+	 * @param channelID The descriptor for the channel to write to.
+	 * @param returnQueue The queue to put the full buffers into.
+	 * @return A block channel reader that reads from the given channel.
+	 * @throws IOException Thrown, if the channel for the reader could not be opened.
+	 */
+	@Override
+	public BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID,
+										LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException
+	{
+		Preconditions.checkState(!shutdown, "I/O-Manger is closed.");
+		return new AsynchronousBlockReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, returnQueue);
+	}
+	
+	/**
+	 * Creates a block channel reader that reads all blocks from the given channel directly in one bulk.
+	 * The reader draws segments to read the blocks into from a supplied list, which must contain as many
+	 * segments as the channel has blocks. After the reader is done, the list with the full segments can be 
+	 * obtained from the reader.
+	 * <p>
+	 * If a channel is not to be read in one bulk, but in multiple smaller batches, a  
+	 * {@link BlockChannelReader} should be used.
+	 * 
+	 * @param channelID The descriptor for the channel to write to.
+	 * @param targetSegments The list to take the segments from into which to read the data.
+	 * @param numBlocks The number of blocks in the channel to read.
+	 * @return A block channel reader that reads from the given channel.
+	 * @throws IOException Thrown, if the channel for the reader could not be opened.
+	 */
+	@Override
+	public BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID,
+			List<MemorySegment> targetSegments,	int numBlocks) throws IOException
+	{
+		Preconditions.checkState(!shutdown, "I/O-Manger is closed.");
+		return new AsynchronousBulkBlockReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, targetSegments, numBlocks);
+	}
+
+	// -------------------------------------------------------------------------
+	//                           I/O Worker Threads
+	// -------------------------------------------------------------------------
+	
+	/**
+	 * A worker thread for asynchronous reads.
+	 */
+	private static final class ReaderThread extends Thread {
+		
+		protected final RequestQueue<ReadRequest> requestQueue;
+
+		private volatile boolean alive;
+
+		// ---------------------------------------------------------------------
+		// Constructors / Destructors
+		// ---------------------------------------------------------------------
+		
+		protected ReaderThread() {
+			this.requestQueue = new RequestQueue<ReadRequest>();
+			this.alive = true;
+		}
+		
+		/**
+		 * Shuts the thread down. This operation does not wait for all pending requests to be served, halts the thread
+		 * immediately. All buffers of pending requests are handed back to their channel readers and an exception is
+		 * reported to them, declaring their request queue as closed.
+		 */
+		protected void shutdown() {
+			synchronized (this) {
+				if (alive) {
+					alive = false;
+					requestQueue.close();
+					interrupt();
+				}
+
+				try {
+					join(1000);
+				}
+				catch (InterruptedException e) {}
+				
+				// notify all pending write requests that the thread has been shut down
+				IOException ioex = new IOException("IO-Manager has been closed.");
+					
+				while (!this.requestQueue.isEmpty()) {
+					ReadRequest request = this.requestQueue.poll();
+					if (request != null) {
+						try {
+							request.requestDone(ioex);
+						}
+						catch (Throwable t) {
+							IOManagerAsync.LOG.error("The handler of the request complete callback threw an exception"
+									+ (t.getMessage() == null ? "." : ": " + t.getMessage()), t);
+						}
+					}
+				}
+			}
+		}
+
+		// ---------------------------------------------------------------------
+		//                             Main loop
+		// ---------------------------------------------------------------------
+
+		@Override
+		public void run() {
+			
+			while (alive) {
+				
+				// get the next buffer. ignore interrupts that are not due to a shutdown.
+				ReadRequest request = null;
+				while (alive && request == null) {
+					try {
+						request = this.requestQueue.take();
+					}
+					catch (InterruptedException e) {
+						if (!this.alive) {
+							return;
+						} else {
+							IOManagerAsync.LOG.warn(Thread.currentThread() + " was interrupted without shutdown.");
+						}
+					}
+				}
+				
+				// remember any IO exception that occurs, so it can be reported to the writer
+				IOException ioex = null;
+
+				try {
+					// read buffer from the specified channel
+					request.read();
+				}
+				catch (IOException e) {
+					ioex = e;
+				}
+				catch (Throwable t) {
+					ioex = new IOException("The buffer could not be read: " + t.getMessage(), t);
+					IOManagerAsync.LOG.error("I/O reading thread encountered an error" + (t.getMessage() == null ? "." : ": " + t.getMessage()), t);
+				}
+
+				// invoke the processed buffer handler of the request issuing reader object
+				try {
+					request.requestDone(ioex);
+				}
+				catch (Throwable t) {
+					IOManagerAsync.LOG.error("The handler of the request-complete-callback threw an exception" + (t.getMessage() == null ? "." : ": " + t.getMessage()), t);
+				}
+			} // end while alive
+		}
+		
+	} // end reading thread
+	
+	/**
+	 * A worker thread that asynchronously writes the buffers to disk.
+	 */
+	private static final class WriterThread extends Thread {
+		
+		protected final RequestQueue<WriteRequest> requestQueue;
+
+		private volatile boolean alive;
+
+		// ---------------------------------------------------------------------
+		// Constructors / Destructors
+		// ---------------------------------------------------------------------
+
+		protected WriterThread() {
+			this.requestQueue = new RequestQueue<WriteRequest>();
+			this.alive = true;
+		}
+
+		/**
+		 * Shuts the thread down. This operation does not wait for all pending requests to be served, halts the thread
+		 * immediately. All buffers of pending requests are handed back to their channel writers and an exception is
+		 * reported to them, declaring their request queue as closed.
+		 */
+		protected void shutdown() {
+			synchronized (this) {
+				if (alive) {
+					alive = false;
+					requestQueue.close();
+					interrupt();
+				}
+
+				try {
+					join(1000);
+				}
+				catch (InterruptedException e) {}
+				
+				// notify all pending write requests that the thread has been shut down
+				IOException ioex = new IOException("IO-Manager has been closed.");
+					
+				while (!this.requestQueue.isEmpty()) {
+					WriteRequest request = this.requestQueue.poll();
+					if (request != null) {
+						try {
+							request.requestDone(ioex);
+						}
+						catch (Throwable t) {
+							IOManagerAsync.LOG.error("The handler of the request complete callback threw an exception"
+									+ (t.getMessage() == null ? "." : ": " + t.getMessage()), t);
+						}
+					}
+				}
+			}
+		}
+
+		// ---------------------------------------------------------------------
+		// Main loop
+		// ---------------------------------------------------------------------
+
+		@Override
+		public void run() {
+			
+			while (this.alive) {
+				
+				WriteRequest request = null;
+				
+				// get the next buffer. ignore interrupts that are not due to a shutdown.
+				while (alive && request == null) {
+					try {
+						request = requestQueue.take();
+					}
+					catch (InterruptedException e) {
+						if (!this.alive) {
+							return;
+						} else {
+							IOManagerAsync.LOG.warn(Thread.currentThread() + " was interrupted without shutdown.");
+						}
+					}
+				}
+				
+				// remember any IO exception that occurs, so it can be reported to the writer
+				IOException ioex = null;
+				
+				try {
+					// write buffer to the specified channel
+					request.write();
+				}
+				catch (IOException e) {
+					ioex = e;
+				}
+				catch (Throwable t) {
+					ioex = new IOException("The buffer could not be written: " + t.getMessage(), t);
+					IOManagerAsync.LOG.error("I/O writing thread encountered an error" + (t.getMessage() == null ? "." : ": " + t.getMessage()), t);
+				}
+
+				// invoke the processed buffer handler of the request issuing writer object
+				try {
+					request.requestDone(ioex);
+				}
+				catch (Throwable t) {
+					IOManagerAsync.LOG.error("The handler of the request-complete-callback threw an exception" + (t.getMessage() == null ? "." : ": " + t.getMessage()), t);
+				}
+			} // end while alive
+		}
+		
+	}; // end writer thread
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java
index d24e8c7..1b7adae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java
@@ -16,23 +16,19 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.disk.iomanager;
 
 import java.io.IOException;
 
-
 /**
  * Basic interface that I/O requests that are sent to the threads of the I/O manager need to implement.
- *
  */
-interface IORequest
-{
+interface IORequest {
+	
 	/**
 	 * Method that is called by the target I/O thread after the request has been processed.
 	 * 
-	 * @param ioex The exception that occurred while processing the I/O request. Is <tt>null</tt> if everything
-	 *             was fine.
+	 * @param ioex The exception that occurred while processing the I/O request. Is <tt>null</tt> if everything was fine.
 	 */
 	public void requestDone(IOException ioex);
 }
@@ -41,8 +37,8 @@ interface IORequest
 /**
  * Interface for I/O requests that are handled by the IOManager's reading thread. 
  */
-interface ReadRequest extends IORequest
-{
+interface ReadRequest extends IORequest {
+	
 	/**
 	 * Called by the target I/O thread to perform the actual reading operation.
 	 * 
@@ -55,8 +51,8 @@ interface ReadRequest extends IORequest
 /**
  * Interface for I/O requests that are handled by the IOManager's writing thread.
  */
-interface WriteRequest extends IORequest
-{
+interface WriteRequest extends IORequest {
+	
 	/**
 	 * Called by the target I/O thread to perform the actual writing operation.
 	 * 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java
new file mode 100644
index 0000000..78699e2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+/**
+ * A {@link RequestDoneCallback} that adds the memory segments to a blocking queue.
+ */
+public class QueuingCallback implements RequestDoneCallback {
+
+	private final LinkedBlockingQueue<MemorySegment> queue;
+	
+	public QueuingCallback(LinkedBlockingQueue<MemorySegment> queue) {
+		this.queue = queue;
+	}
+
+	@Override
+	public void requestSuccessful(MemorySegment buffer) {
+		queue.add(buffer);
+	}
+
+	@Override
+	public void requestFailed(MemorySegment buffer, IOException e) {
+		// the I/O error is recorded in the writer already
+		queue.add(buffer);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestDoneCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestDoneCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestDoneCallback.java
new file mode 100644
index 0000000..982343c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestDoneCallback.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+/**
+ * Callback to be executed on completion of an asynchronous I/O request.
+ * Depending on success or failure, either method of
+ * {@ink #requestSuccessful(MemorySegment)} or {@link #requestFailed(MemorySegment, IOException)}
+ * is called.
+ */
+public interface RequestDoneCallback {
+
+	void requestSuccessful(MemorySegment buffer);
+	
+	void requestFailed(MemorySegment buffer, IOException e);
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestQueue.java
index b506380..230bf50 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestQueue.java
@@ -16,29 +16,19 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.disk.iomanager;
 
-
 import java.io.Closeable;
-import java.io.IOException;
 import java.util.concurrent.LinkedBlockingQueue;
 
-
 /**
  * A {@link LinkedBlockingQueue} that is extended with closing methods.
- *
  */
-public final class RequestQueue<E> extends LinkedBlockingQueue<E> implements Closeable
-{
-	/**
-	 * UID for serialization interoperability. 
-	 */
+public final class RequestQueue<E> extends LinkedBlockingQueue<E> implements Closeable {
+	
 	private static final long serialVersionUID = 3804115535778471680L;
 	
-	/**
-	 * Flag marking this queue as closed.
-	 */
+	/** Flag marking this queue as closed. */
 	private volatile boolean closed = false;
 	
 	/**
@@ -47,7 +37,7 @@ public final class RequestQueue<E> extends LinkedBlockingQueue<E> implements Clo
 	 * @see java.io.Closeable#close()
 	 */
 	@Override
-	public void close() throws IOException {
+	public void close() {
 		this.closed = true;
 	}
 	
@@ -56,9 +46,7 @@ public final class RequestQueue<E> extends LinkedBlockingQueue<E> implements Clo
 	 * 
 	 * @return True, if the queue is closed, false otherwise.
 	 */
-	public boolean isClosed()
-	{
+	public boolean isClosed() {
 		return this.closed;
 	}
-	
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousFileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousFileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousFileIOChannel.java
new file mode 100644
index 0000000..fd6c230
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousFileIOChannel.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import java.io.IOException;
+
+/**
+ * A base class for synchronous readers and writers.
+ */
+public abstract class SynchronousFileIOChannel extends AbstractFileIOChannel {
+	
+	protected SynchronousFileIOChannel(FileIOChannel.ID channelID, boolean writeEnabled) throws IOException {
+		super(channelID, writeEnabled);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public boolean isClosed() {
+		return !this.fileChannel.isOpen();
+	}
+	
+	@Override
+	public void close() throws IOException {
+		if (this.fileChannel.isOpen()) {
+			this.fileChannel.close();
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
index b5504f3..a02e81c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SerializedUpdateBuffer.java
@@ -30,7 +30,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memorymanager.AbstractPagedInputView;
 import org.apache.flink.runtime.memorymanager.AbstractPagedOutputView;
@@ -49,7 +49,7 @@ public class SerializedUpdateBuffer extends AbstractPagedOutputView {
 
 	private final IOManager ioManager;
 
-	private final Channel.Enumerator channelEnumerator;
+	private final FileIOChannel.Enumerator channelEnumerator;
 
 	private final int numSegmentsSpillingThreshold;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
index 020da9d..6d3194b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
@@ -33,7 +33,7 @@ import org.apache.flink.core.memory.SeekableDataInputView;
 import org.apache.flink.core.memory.SeekableDataOutputView;
 import org.apache.flink.runtime.io.disk.RandomAccessOutputView;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memorymanager.AbstractPagedInputView;
@@ -282,7 +282,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 	 * @return The number of buffers that were freed by spilling this partition.
 	 * @throws IOException Thrown, if the writing failed.
 	 */
-	public int spillPartition(List<MemorySegment> target, IOManager ioAccess, Channel.ID targetChannel,
+	public int spillPartition(List<MemorySegment> target, IOManager ioAccess, FileIOChannel.ID targetChannel,
 			LinkedBlockingQueue<MemorySegment> bufferReturnQueue)
 	throws IOException
 	{
@@ -311,7 +311,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 		return this.buildSideWriteBuffer.spill(this.buildSideChannel);
 	}
 	
-	public void finalizeBuildPhase(IOManager ioAccess, Channel.Enumerator probeChannelEnumerator,
+	public void finalizeBuildPhase(IOManager ioAccess, FileIOChannel.Enumerator probeChannelEnumerator,
 			LinkedBlockingQueue<MemorySegment> bufferReturnQueue)
 	throws IOException
 	{
@@ -449,7 +449,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 	//                   ReOpenableHashTable related methods
 	// --------------------------------------------------------------------------------------------------
 	
-	public void prepareProbePhase(IOManager ioAccess, Channel.Enumerator probeChannelEnumerator,
+	public void prepareProbePhase(IOManager ioAccess, FileIOChannel.Enumerator probeChannelEnumerator,
 			LinkedBlockingQueue<MemorySegment> bufferReturnQueue) throws IOException {
 		if (isInMemory()) {
 			return;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
index 21bd8df..1bbf246 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
@@ -37,7 +37,7 @@ import org.apache.flink.core.memory.SeekableDataOutputView;
 import org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
 import org.apache.flink.runtime.io.disk.iomanager.BulkBlockChannelReader;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
 import org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -306,7 +306,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 	 * The channel enumerator that is used while processing the current partition to create
 	 * channels for the spill partitions it requires.
 	 */
-	protected Channel.Enumerator currentEnumerator;
+	protected FileIOChannel.Enumerator currentEnumerator;
 	
 	/**
 	 * The array of memory segments that contain the buckets which form the actual hash-table

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java
index 3695aea..56dcfae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableHashPartition.java
@@ -28,14 +28,14 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentSource;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
 import org.apache.flink.runtime.io.disk.iomanager.BulkBlockChannelReader;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 
 public class ReOpenableHashPartition<BT, PT> extends HashPartition<BT, PT> {
 
 	protected int initialPartitionBuffersCount = -1; 						// stores the number of buffers used for an in-memory partition after the build phase has finished.
 
-	private Channel.ID initialBuildSideChannel = null;			// path to initial build side contents (only for in-memory partitions)
+	private FileIOChannel.ID initialBuildSideChannel = null;			// path to initial build side contents (only for in-memory partitions)
 	
 	private BlockChannelWriter initialBuildSideWriter = null;
 
@@ -97,7 +97,7 @@ public class ReOpenableHashPartition<BT, PT> extends HashPartition<BT, PT> {
 	 * 
 	 * @return Number of memorySegments in the writeBehindBuffers!
 	 */
-	int spillInMemoryPartition(Channel.ID targetChannel, IOManager ioManager, LinkedBlockingQueue<MemorySegment> writeBehindBuffers) throws IOException {
+	int spillInMemoryPartition(FileIOChannel.ID targetChannel, IOManager ioManager, LinkedBlockingQueue<MemorySegment> writeBehindBuffers) throws IOException {
 		this.initialPartitionBuffersCount = partitionBuffers.length; // for ReOpenableHashMap
 		this.initialBuildSideChannel = targetChannel;
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
index 5099f38..6819924 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -36,7 +36,7 @@ public class ReOpenableMutableHashTable<BT, PT> extends MutableHashTable<BT, PT>
 	/**
 	 * Channel for the spilled partitions
 	 */
-	private final Channel.Enumerator spilledInMemoryPartitions;
+	private final FileIOChannel.Enumerator spilledInMemoryPartitions;
 	
 	/**
 	 * Stores the initial partitions and a list of the files that contain the spilled contents

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
index e566e25..63e64c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.sort;
 
 import java.io.IOException;
@@ -36,9 +35,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.BlockChannelAccess;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -267,7 +265,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 				throw new IOException("The user-defined combiner failed in its 'open()' method.", t);
 			}
 			
-			final Channel.Enumerator enumerator = this.ioManager.createChannelEnumerator();
+			final FileIOChannel.Enumerator enumerator = this.ioManager.createChannelEnumerator();
 			List<ChannelWithBlockCount> channelIDs = new ArrayList<ChannelWithBlockCount>();
 
 			
@@ -296,7 +294,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 				}
 				
 				// open next channel
-				Channel.ID channel = enumerator.next();
+				FileIOChannel.ID channel = enumerator.next();
 				registerChannelToBeRemovedAtShudown(channel);
 				
 				if (LOG.isDebugEnabled()) {
@@ -304,8 +302,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 				}
 
 				// create writer
-				final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(
-																channel, this.numWriteBuffersToCluster);
+				final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
 				registerOpenChannelToBeRemovedAtShudown(writer);
 				final ChannelWriterOutputView output = new ChannelWriterOutputView(writer, this.writeMemory,
 																			this.memManager.getPageSize());
@@ -420,7 +417,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 				
 				// get the readers and register them to be released
 				final MergeIterator<E> mergeIterator = getMergingIterator(
-						channelIDs, readBuffers, new ArrayList<BlockChannelAccess<?, ?>>(channelIDs.size()));
+						channelIDs, readBuffers, new ArrayList<FileIOChannel>(channelIDs.size()));
 				
 				// set the target for the user iterator
 				// if the final merge combines, create a combining iterator around the merge iterator,
@@ -452,17 +449,16 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 		throws IOException
 		{
 			// the list with the readers, to be closed at shutdown
-			final List<BlockChannelAccess<?, ?>> channelAccesses = new ArrayList<BlockChannelAccess<?, ?>>(channelIDs.size());
+			final List<FileIOChannel> channelAccesses = new ArrayList<FileIOChannel>(channelIDs.size());
 
 			// the list with the target iterators
 			final MergeIterator<E> mergeIterator = getMergingIterator(channelIDs, readBuffers, channelAccesses);
 			final KeyGroupedIterator<E> groupedIter = new KeyGroupedIterator<E>(mergeIterator, this.serializer, this.comparator2);
 
 			// create a new channel writer
-			final Channel.ID mergedChannelID = this.ioManager.createChannel();
+			final FileIOChannel.ID mergedChannelID = this.ioManager.createChannel();
 			registerChannelToBeRemovedAtShudown(mergedChannelID);
-			final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(
-															mergedChannelID, this.numWriteBuffersToCluster);
+			final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(mergedChannelID);
 			registerOpenChannelToBeRemovedAtShudown(writer);
 			final ChannelWriterOutputView output = new ChannelWriterOutputView(writer, writeBuffers, 
 																			this.memManager.getPageSize());
@@ -488,7 +484,7 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 			
 			// remove the merged channel readers from the clear-at-shutdown list
 			for (int i = 0; i < channelAccesses.size(); i++) {
-				BlockChannelAccess<?, ?> access = channelAccesses.get(i);
+				FileIOChannel access = channelAccesses.get(i);
 				access.closeAndDelete();
 				unregisterOpenChannelToBeRemovedAtShudown(access);
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
index 3dd21f5..459ef82 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
@@ -37,14 +37,13 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator;
-import org.apache.flink.runtime.io.disk.iomanager.BlockChannelAccess;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
 import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
 import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.Channel.ID;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -135,12 +134,12 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 	/**
 	 * Collection of all currently open channels, to be closed and deleted during cleanup.
 	 */
-	private final HashSet<BlockChannelAccess<?, ?>> openChannels;
+	private final HashSet<FileIOChannel> openChannels;
 	
 	/**
 	 * Collection of all temporary files created and to be removed when closing the sorter.
 	 */
-	private final HashSet<Channel.ID> channelsToDeleteAtShutdown;
+	private final HashSet<FileIOChannel.ID> channelsToDeleteAtShutdown;
 	
 	/**
 	 * The monitor which guards the iterator field.
@@ -387,8 +386,8 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 		};
 		
 		// create sets that track the channels we need to clean up when closing the sorter
-		this.channelsToDeleteAtShutdown = new HashSet<Channel.ID>(64);
-		this.openChannels = new HashSet<BlockChannelAccess<?,?>>(64);
+		this.channelsToDeleteAtShutdown = new HashSet<FileIOChannel.ID>(64);
+		this.openChannels = new HashSet<FileIOChannel>(64);
 
 		// start the thread that reads the input channels
 		this.readThread = getReadingThread(exceptionHandler, input, circularQueues, parentTask,
@@ -519,8 +518,8 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 			// we have to loop this, because it may fail with a concurrent modification exception
 			while (!this.openChannels.isEmpty()) {
 				try {
-					for (Iterator<BlockChannelAccess<?, ?>> channels = this.openChannels.iterator(); channels.hasNext(); ) {
-						final BlockChannelAccess<?, ?> channel = channels.next();
+					for (Iterator<FileIOChannel> channels = this.openChannels.iterator(); channels.hasNext(); ) {
+						final FileIOChannel channel = channels.next();
 						channels.remove();
 						channel.closeAndDelete();
 					}
@@ -531,8 +530,8 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 			// we have to loop this, because it may fail with a concurrent modification exception
 			while (!this.channelsToDeleteAtShutdown.isEmpty()) {
 				try {
-					for (Iterator<Channel.ID> channels = this.channelsToDeleteAtShutdown.iterator(); channels.hasNext(); ) {
-						final Channel.ID channel = channels.next();
+					for (Iterator<FileIOChannel.ID> channels = this.channelsToDeleteAtShutdown.iterator(); channels.hasNext(); ) {
+						final FileIOChannel.ID channel = channels.next();
 						channels.remove();
 						try {
 							final File f = new File(channel.getPath());
@@ -1257,7 +1256,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 			
 			// ------------------- Spilling Phase ------------------------
 			
-			final Channel.Enumerator enumerator = this.ioManager.createChannelEnumerator();			
+			final FileIOChannel.Enumerator enumerator = this.ioManager.createChannelEnumerator();			
 			List<ChannelWithBlockCount> channelIDs = new ArrayList<ChannelWithBlockCount>();
 
 			
@@ -1286,12 +1285,11 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 				}
 				
 				// open next channel
-				Channel.ID channel = enumerator.next();
+				FileIOChannel.ID channel = enumerator.next();
 				registerChannelToBeRemovedAtShudown(channel);
 
 				// create writer
-				final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(
-																channel, this.numWriteBuffersToCluster);
+				final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
 				registerOpenChannelToBeRemovedAtShudown(writer);
 				final ChannelWriterOutputView output = new ChannelWriterOutputView(writer, this.writeMemory,
 																			this.memManager.getPageSize());
@@ -1351,7 +1349,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 				getSegmentsForReaders(readBuffers, this.sortReadMemory, channelIDs.size());
 				
 				// get the readers and register them to be released
-				setResultIterator(getMergingIterator(channelIDs, readBuffers, new ArrayList<BlockChannelAccess<?, ?>>(channelIDs.size())));
+				setResultIterator(getMergingIterator(channelIDs, readBuffers, new ArrayList<FileIOChannel>(channelIDs.size())));
 			}
 
 			// done
@@ -1405,7 +1403,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 		 * @throws IOException Thrown, if the readers encounter an I/O problem.
 		 */
 		protected final MergeIterator<E> getMergingIterator(final List<ChannelWithBlockCount> channelIDs,
-				final List<List<MemorySegment>> inputSegments, List<BlockChannelAccess<?, ?>> readerList)
+				final List<List<MemorySegment>> inputSegments, List<FileIOChannel> readerList)
 			throws IOException
 		{
 			// create one iterator per channel id
@@ -1420,9 +1418,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 				final List<MemorySegment> segsForChannel = inputSegments.get(i);
 				
 				// create a reader. if there are multiple segments for the reader, issue multiple together per I/O request
-				final BlockChannelReader reader = segsForChannel.size() >= 4 ? 
-					this.ioManager.createBlockChannelReader(channel.getChannel(), segsForChannel.size() / 2) :
-					this.ioManager.createBlockChannelReader(channel.getChannel());
+				final BlockChannelReader reader = this.ioManager.createBlockChannelReader(channel.getChannel());
 					
 				readerList.add(reader);
 				registerOpenChannelToBeRemovedAtShudown(reader);
@@ -1495,16 +1491,15 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 		throws IOException
 		{
 			// the list with the readers, to be closed at shutdown
-			final List<BlockChannelAccess<?, ?>> channelAccesses = new ArrayList<BlockChannelAccess<?, ?>>(channelIDs.size());
+			final List<FileIOChannel> channelAccesses = new ArrayList<FileIOChannel>(channelIDs.size());
 
 			// the list with the target iterators
 			final MergeIterator<E> mergeIterator = getMergingIterator(channelIDs, readBuffers, channelAccesses);
 
 			// create a new channel writer
-			final Channel.ID mergedChannelID = this.ioManager.createChannel();
+			final FileIOChannel.ID mergedChannelID = this.ioManager.createChannel();
 			registerChannelToBeRemovedAtShudown(mergedChannelID);
-			final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(
-															mergedChannelID, this.numWriteBuffersToCluster);
+			final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(mergedChannelID);
 			registerOpenChannelToBeRemovedAtShudown(writer);
 			final ChannelWriterOutputView output = new ChannelWriterOutputView(writer, writeBuffers, 
 																			this.memManager.getPageSize());
@@ -1523,7 +1518,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 			
 			// remove the merged channel readers from the clear-at-shutdown list
 			for (int i = 0; i < channelAccesses.size(); i++) {
-				BlockChannelAccess<?, ?> access = channelAccesses.get(i);
+				FileIOChannel access = channelAccesses.get(i);
 				access.closeAndDelete();
 				unregisterOpenChannelToBeRemovedAtShudown(access);
 			}
@@ -1577,7 +1572,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 		 * 
 		 * @param channel The channel id.
 		 */
-		protected void registerChannelToBeRemovedAtShudown(Channel.ID channel) {
+		protected void registerChannelToBeRemovedAtShudown(FileIOChannel.ID channel) {
 			UnilateralSortMerger.this.channelsToDeleteAtShutdown.add(channel);
 		}
 
@@ -1586,7 +1581,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 		 * 
 		 * @param channel The channel id.
 		 */
-		protected void unregisterChannelToBeRemovedAtShudown(Channel.ID channel) {
+		protected void unregisterChannelToBeRemovedAtShudown(FileIOChannel.ID channel) {
 			UnilateralSortMerger.this.channelsToDeleteAtShutdown.remove(channel);
 		}
 		
@@ -1595,7 +1590,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 		 * 
 		 * @param channel The channel reader/writer.
 		 */
-		protected void registerOpenChannelToBeRemovedAtShudown(BlockChannelAccess<?, ?> channel) {
+		protected void registerOpenChannelToBeRemovedAtShudown(FileIOChannel channel) {
 			UnilateralSortMerger.this.openChannels.add(channel);
 		}
 
@@ -1604,7 +1599,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 		 * 
 		 * @param channel The channel reader/writer.
 		 */
-		protected void unregisterOpenChannelToBeRemovedAtShudown(BlockChannelAccess<?, ?> channel) {
+		protected void unregisterOpenChannelToBeRemovedAtShudown(FileIOChannel channel) {
 			UnilateralSortMerger.this.openChannels.remove(channel);
 		}
 	}
@@ -1769,7 +1764,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 	
 	protected static final class ChannelWithBlockCount
 	{
-		private final Channel.ID channel;
+		private final FileIOChannel.ID channel;
 		private final int blockCount;
 		
 		public ChannelWithBlockCount(ID channel, int blockCount) {
@@ -1777,7 +1772,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 			this.blockCount = blockCount;
 		}
 
-		public Channel.ID getChannel() {
+		public FileIOChannel.ID getChannel() {
 			return channel;
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
index 7220b23..93bece2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
@@ -75,6 +75,7 @@ import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.ChannelManager;
 import org.apache.flink.runtime.io.network.LocalConnectionManager;
 import org.apache.flink.runtime.io.network.NetworkConnectionManager;
@@ -354,7 +355,7 @@ public class TaskManager implements TaskOperationProtocol {
 						(blobServerAddress), GlobalConfiguration.getConfiguration());
 			}
 		}
-		this.ioManager = new IOManager(tmpDirPaths);
+		this.ioManager = new IOManagerAsync(tmpDirPaths);
 		
 		// start the heart beats
 		{

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
index 72fe3f1..535c756 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/EnvironmentInformation.java
@@ -27,6 +27,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.security.UserGroupInformation;
 
+/**
+ * Utility class that gives access to the execution environment of the JVM, like
+ * the executing user, startup options, or the JVM version.
+ */
 public class EnvironmentInformation {
 
 	private static final Logger LOG = LoggerFactory.getLogger(EnvironmentInformation.class);
@@ -183,6 +187,10 @@ public class EnvironmentInformation {
 			return UNKNOWN;
 		}
 	}
+	
+	public static String getTemporaryFileDirectory() {
+		return System.getProperty("java.io.tmpdir");
+	}
 
 	public static void logEnvironmentInfo(Logger log, String componentName) {
 		if (log.isInfoEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
index 879bd37..b4070ed 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
@@ -55,7 +55,7 @@ public final class BlobKeyTest {
 	}
 
 	/**
-	 * Tests the serialization/deserialization of BLOB keys using the regular {@link IOReadableWritable} API.
+	 * Tests the serialization/deserialization of BLOB keys using the regular {@link org.apache.flink.core.io.IOReadableWritable} API.
 	 */
 	@Test
 	public void testSerialization() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
index 2073061..c05fcca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
@@ -23,14 +23,14 @@ import java.io.EOFException;
 import java.util.List;
 
 import org.junit.Assert;
-
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
 import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -79,7 +79,7 @@ public class ChannelViewsTest
 	@Before
 	public void beforeTest() {
 		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1, MEMORY_PAGE_SIZE);
-		this.ioManager = new IOManager();
+		this.ioManager = new IOManagerAsync();
 	}
 
 	@After
@@ -103,7 +103,7 @@ public class ChannelViewsTest
 	public void testWriteReadSmallRecords() throws Exception
 	{
 		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-		final Channel.ID channel = this.ioManager.createChannel();
+		final FileIOChannel.ID channel = this.ioManager.createChannel();
 		
 		// create the writer output view
 		List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
@@ -147,7 +147,7 @@ public class ChannelViewsTest
 	public void testWriteAndReadLongRecords() throws Exception
 	{
 		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LONG_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-		final Channel.ID channel = this.ioManager.createChannel();
+		final FileIOChannel.ID channel = this.ioManager.createChannel();
 		
 		// create the writer output view
 		List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
@@ -188,7 +188,7 @@ public class ChannelViewsTest
 	public void testReadTooMany() throws Exception
 	{
 		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-		final Channel.ID channel = this.ioManager.createChannel();
+		final FileIOChannel.ID channel = this.ioManager.createChannel();
 		
 		// create the writer output view
 		List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
@@ -239,7 +239,7 @@ public class ChannelViewsTest
 	public void testReadWithoutKnownBlockCount() throws Exception
 	{
 		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-		final Channel.ID channel = this.ioManager.createChannel();
+		final FileIOChannel.ID channel = this.ioManager.createChannel();
 		
 		// create the writer output view
 		List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
@@ -283,7 +283,7 @@ public class ChannelViewsTest
 	public void testWriteReadOneBufferOnly() throws Exception
 	{
 		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-		final Channel.ID channel = this.ioManager.createChannel();
+		final FileIOChannel.ID channel = this.ioManager.createChannel();
 		
 		// create the writer output view
 		List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, 1);
@@ -327,7 +327,7 @@ public class ChannelViewsTest
 	public void testWriteReadNotAll() throws Exception
 	{
 		final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-		final Channel.ID channel = this.ioManager.createChannel();
+		final FileIOChannel.ID channel = this.ioManager.createChannel();
 		
 		// create the writer output view
 		List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
index 629d1dc..22c40f6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
@@ -23,11 +23,11 @@ import java.io.EOFException;
 import java.util.ArrayList;
 
 import org.junit.Assert;
-
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.SpillingBuffer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.ListMemorySegmentSource;
@@ -71,7 +71,7 @@ public class SpillingBufferTest {
 	@Before
 	public void beforeTest() {
 		memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
-		ioManager = new IOManager();
+		ioManager = new IOManagerAsync();
 	}
 
 	@After


Mime
View raw message