flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [6/9] flink git commit: [FLINK-1296] [runtime] Add better paged disk I/O readers / writers
Date Wed, 21 Jan 2015 11:08:56 GMT
[FLINK-1296] [runtime] Add better paged disk I/O readers / writers


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/996d404c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/996d404c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/996d404c

Branch: refs/heads/master
Commit: 996d404ced9347aaa00de4356c07333d46322eae
Parents: 7610588
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Dec 1 20:21:25 2014 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Wed Jan 21 12:01:35 2015 +0100

----------------------------------------------------------------------
 .../runtime/io/disk/FileChannelInputView.java   | 148 ++++++++
 .../runtime/io/disk/FileChannelOutputView.java  | 144 ++++++++
 .../io/disk/SeekableFileChannelInputView.java   | 186 ++++++++++
 .../disk/iomanager/AbstractFileIOChannel.java   |  13 +-
 .../disk/iomanager/AsynchronousBlockReader.java |   7 +-
 .../iomanager/AsynchronousFileIOChannel.java    |  18 +-
 .../io/disk/iomanager/BlockChannelReader.java   |   7 +
 .../io/disk/iomanager/FileIOChannel.java        |   9 +-
 .../io/disk/iomanager/IOManagerAsync.java       |  14 +-
 .../runtime/memorymanager/MemoryManager.java    |   3 +-
 .../io/disk/FileChannelStreamsITCase.java       | 307 ++++++++++++++++
 .../runtime/io/disk/FileChannelStreamsTest.java | 119 ++++++
 .../disk/SeekableFileChannelInputViewTest.java  | 157 ++++++++
 .../AsynchronousFileIOChannelsTest.java         | 175 +++++++++
 .../io/disk/iomanager/IOManagerAsyncTest.java   | 359 +++++++++++++++++++
 .../io/disk/iomanager/IOManagerITCase.java      |  66 +---
 .../io/disk/iomanager/IOManagerTest.java        | 237 +++---------
 .../operators/testutils/PairGenerator.java      | 161 +++++++++
 18 files changed, 1872 insertions(+), 258 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
new file mode 100644
index 0000000..9fb8072
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelInputView.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
+import org.apache.flink.runtime.memorymanager.AbstractPagedInputView;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.util.MathUtils;
+
+/**
+ * A {@link org.apache.flink.core.memory.DataInputView} that is backed by a {@link BlockChannelReader},
+ * making it effectively a data input stream. The view reads it data in blocks from the underlying channel.
+ * The view can read data that has been written by a {@link FileChannelOutputView}, or that was written in blocks
+ * in another fashion.
+ */
+public class FileChannelInputView extends AbstractPagedInputView {
+	
+	private final BlockChannelReader reader;
+	
+	private final MemoryManager memManager;
+	
+	private final List<MemorySegment> memory;
+	
+	private final int sizeOfLastBlock;
+	
+	private int numRequestsRemaining;
+	
+	private int numBlocksRemaining;
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public FileChannelInputView(BlockChannelReader reader, MemoryManager memManager, List<MemorySegment> memory, int sizeOfLastBlock) throws IOException {
+		super(0);
+		
+		checkNotNull(reader);
+		checkNotNull(memManager);
+		checkNotNull(memory);
+		checkArgument(!reader.isClosed());
+		checkArgument(memory.size() > 0);
+		
+		this.reader = reader;
+		this.memManager = memManager;
+		this.memory = memory;
+		this.sizeOfLastBlock = sizeOfLastBlock;
+		
+		try {
+			final long channelLength = reader.getSize();
+			final int segmentSize = memManager.getPageSize();
+			
+			this.numBlocksRemaining = MathUtils.checkedDownCast(channelLength / segmentSize);
+			if (channelLength % segmentSize != 0) {
+				this.numBlocksRemaining++;
+			}
+			
+			this.numRequestsRemaining = numBlocksRemaining;
+			
+			for (int i = 0; i < memory.size(); i++) {
+				sendReadRequest(memory.get(i));
+			}
+			
+			advance();
+		}
+		catch (IOException e) {
+			memManager.release(memory);
+			throw e;
+		}
+	}
+	
+	public void close() throws IOException {
+		close(false);
+	}
+	
+	public void closeAndDelete() throws IOException {
+		close(true);
+	}
+	
+	private void close(boolean deleteFile) throws IOException {
+		try {
+			clear();
+			if (deleteFile) {
+				reader.closeAndDelete();
+			} else {
+				reader.close();
+			}
+		} finally {
+			synchronized (memory) {
+				memManager.release(memory);
+				memory.clear();
+			}
+		}
+	}
+	
+	@Override
+	protected MemorySegment nextSegment(MemorySegment current) throws IOException {
+		// check for end-of-stream
+		if (numBlocksRemaining <= 0) {
+			reader.close();
+			throw new EOFException();
+		}
+		
+		// send a request first. if we have only a single segment, this same segment will be the one obtained in the next lines
+		if (current != null) {
+			sendReadRequest(current);
+		}
+		
+		// get the next segment
+		numBlocksRemaining--;
+		return reader.getNextReturnedSegment();
+	}
+	
+	@Override
+	protected int getLimitForSegment(MemorySegment segment) {
+		return numBlocksRemaining > 0 ? segment.size() : sizeOfLastBlock;
+	}
+	
+	private void sendReadRequest(MemorySegment seg) throws IOException {
+		if (numRequestsRemaining > 0) {
+			reader.readBlock(seg);
+			numRequestsRemaining--;
+		} else {
+			memManager.release(seg);
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java
new file mode 100644
index 0000000..2b8b728
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileChannelOutputView.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
+import org.apache.flink.runtime.memorymanager.AbstractPagedOutputView;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+
+/**
+ * A {@link org.apache.flink.core.memory.DataOutputView} that is backed by a {@link BlockChannelWriter}, making it effectively a data output
+ * stream. The view writes it data in blocks to the underlying channel.
+ */
+public class FileChannelOutputView extends AbstractPagedOutputView {
+	
+	private final BlockChannelWriter writer;		// the writer to the channel
+	
+	private final MemoryManager memManager;
+	
+	private final List<MemorySegment> memory;
+	
+	private int numBlocksWritten;
+	
+	private int bytesInLatestSegment;
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public FileChannelOutputView(BlockChannelWriter writer, MemoryManager memManager, List<MemorySegment> memory, int segmentSize) throws IOException {
+		super(segmentSize, 0);
+		
+		checkNotNull(writer);
+		checkNotNull(memManager);
+		checkNotNull(memory);
+		checkArgument(!writer.isClosed());
+		
+		this.writer = writer;
+		this.memManager = memManager;
+		this.memory = memory;
+		
+		
+		for (MemorySegment next : memory) {
+			writer.getReturnQueue().add(next);
+		}
+		
+		// move to the first page
+		advance();
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Closes this output, writing pending data and releasing the memory.
+	 * 
+	 * @throws IOException Thrown, if the pending data could not be written.
+	 */
+	public void close() throws IOException {
+		close(false);
+	}
+	
+	/**
+	 * Closes this output, writing pending data and releasing the memory.
+	 * 
+	 * @throws IOException Thrown, if the pending data could not be written.
+	 */
+	public void closeAndDelete() throws IOException {
+		close(true);
+	}
+	
+	private void close(boolean delete) throws IOException {
+		try {
+			// send off set last segment, if we have not been closed before
+			MemorySegment current = getCurrentSegment();
+			if (current != null) {
+				writeSegment(current, getCurrentPositionInSegment());
+			}
+
+			clear();
+			if (delete) {
+				writer.closeAndDelete();
+			} else {
+				writer.close();
+			}
+		}
+		finally {
+			memManager.release(memory);
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Gets the number of blocks written by this output view.
+	 * 
+	 * @return The number of blocks written by this output view.
+	 */
+	public int getBlockCount() {
+		return numBlocksWritten;
+	}
+	
+	/**
+	 * Gets the number of bytes written in the latest memory segment.
+	 * 
+	 * @return The number of bytes written in the latest memory segment.
+	 */
+	public int getBytesInLatestSegment() {
+		return bytesInLatestSegment;
+	}
+	
+	@Override
+	protected MemorySegment nextSegment(MemorySegment current, int posInSegment) throws IOException {
+		if (current != null) {
+			writeSegment(current, posInSegment);
+		}
+		return writer.getNextReturnedSegment();
+	}
+	
+	private void writeSegment(MemorySegment segment, int writePosition) throws IOException {
+		writer.writeBlock(segment);
+		numBlocksWritten++;
+		bytesInLatestSegment = writePosition;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
new file mode 100644
index 0000000..e97a1ff
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputView.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.memorymanager.AbstractPagedInputView;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.util.MathUtils;
+
+/**
+ * A {@link org.apache.flink.core.memory.DataInputView} that is backed by a {@link BlockChannelReader},
+ * making it effectively a data input stream. The view reads it data in blocks from the underlying channel.
+ * The view can read data that has been written by a {@link FileChannelOutputView}, or that was written in blocks
+ * in another fashion.
+ */
+public class SeekableFileChannelInputView extends AbstractPagedInputView {
+	
+	private BlockChannelReader reader;
+	
+	private final IOManager ioManager;
+	
+	private final FileIOChannel.ID channelId;
+	
+	private final MemoryManager memManager;
+	
+	private final List<MemorySegment> memory;
+	
+	private final int sizeOfLastBlock;
+	
+	private final int numBlocksTotal;
+	
+	private final int segmentSize;
+	
+	private int numRequestsRemaining;
+	
+	private int numBlocksRemaining;
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public SeekableFileChannelInputView(IOManager ioManager, FileIOChannel.ID channelId, MemoryManager memManager, List<MemorySegment> memory, int sizeOfLastBlock) throws IOException {
+		super(0);
+		
+		checkNotNull(ioManager);
+		checkNotNull(channelId);
+		checkNotNull(memManager);
+		checkNotNull(memory);
+		
+		this.ioManager = ioManager;
+		this.channelId = channelId;
+		this.memManager = memManager;
+		this.memory = memory;
+		this.sizeOfLastBlock = sizeOfLastBlock;
+		this.segmentSize = memManager.getPageSize();
+		
+		this.reader = ioManager.createBlockChannelReader(channelId);
+		
+		try {
+			final long channelLength = reader.getSize();
+			
+			final int blockCount =  MathUtils.checkedDownCast(channelLength / segmentSize);
+			this.numBlocksTotal = (channelLength % segmentSize == 0) ? blockCount : blockCount + 1;
+
+			this.numBlocksRemaining = this.numBlocksTotal;
+			this.numRequestsRemaining = numBlocksRemaining;
+			
+			for (int i = 0; i < memory.size(); i++) {
+				sendReadRequest(memory.get(i));
+			}
+			
+			advance();
+		}
+		catch (IOException e) {
+			memManager.release(memory);
+			throw e;
+		}
+	}
+	
+	public void seek(long position) throws IOException {
+		final int block = MathUtils.checkedDownCast(position / segmentSize);
+		final int positionInBlock = (int) (position % segmentSize);
+		
+		if (position < 0 || block >= numBlocksTotal || (block == numBlocksTotal - 1 && positionInBlock > sizeOfLastBlock)) {
+			throw new IllegalArgumentException("Position is out of range");
+		}
+		
+		clear();
+		if (reader != null) {
+			reader.close();
+		}
+		
+		reader = ioManager.createBlockChannelReader(channelId);
+		
+		if (block > 0) {
+			reader.seekToPosition(block * segmentSize);
+		}
+		
+		this.numBlocksRemaining = this.numBlocksTotal - block;
+		this.numRequestsRemaining = numBlocksRemaining;
+		
+		for (int i = 0; i < memory.size(); i++) {
+			sendReadRequest(memory.get(i));
+		}
+		
+		numBlocksRemaining--;
+		seekInput(reader.getNextReturnedSegment(), positionInBlock, numBlocksRemaining == 0 ? sizeOfLastBlock : segmentSize);
+	}
+	
+	public void close() throws IOException {
+		close(false);
+	}
+	
+	public void closeAndDelete() throws IOException {
+		close(true);
+	}
+	
+	private void close(boolean deleteFile) throws IOException {
+		try {
+			clear();
+			if (deleteFile) {
+				reader.closeAndDelete();
+			} else {
+				reader.close();
+			}
+		} finally {
+			synchronized (memory) {
+				memManager.release(memory);
+				memory.clear();
+			}
+		}
+	}
+	
+	@Override
+	protected MemorySegment nextSegment(MemorySegment current) throws IOException {
+		// check for end-of-stream
+		if (numBlocksRemaining <= 0) {
+			reader.close();
+			throw new EOFException();
+		}
+		
+		// send a request first. if we have only a single segment, this same segment will be the one obtained in the next lines
+		if (current != null) {
+			sendReadRequest(current);
+		}
+		
+		// get the next segment
+		numBlocksRemaining--;
+		return reader.getNextReturnedSegment();
+	}
+	
+	@Override
+	protected int getLimitForSegment(MemorySegment segment) {
+		return numBlocksRemaining > 0 ? segment.size() : sizeOfLastBlock;
+	}
+	
+	private void sendReadRequest(MemorySegment seg) throws IOException {
+		if (numRequestsRemaining > 0) {
+			reader.readBlock(seg);
+			numRequestsRemaining--;
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java
index ecb794e..3991167 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AbstractFileIOChannel.java
@@ -64,17 +64,18 @@ public abstract class AbstractFileIOChannel implements FileIOChannel {
 	
 	// --------------------------------------------------------------------------------------------
 
-	/**
-	 * Gets the channel ID of this channel.
-	 * 
-	 * @return This channel's ID.
-	 */
 	@Override
 	public final FileIOChannel.ID getChannelID() {
 		return this.id;
 	}
 	
 	@Override
+	public long getSize() throws IOException {
+		FileChannel channel = fileChannel;
+		return channel == null ? 0 : channel.size();
+	}
+	
+	@Override
 	public abstract boolean isClosed();
 	
 	@Override
@@ -103,4 +104,4 @@ public abstract class AbstractFileIOChannel implements FileIOChannel {
 			deleteChannel();
 		}
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
index a15acb5..acfa71f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBlockReader.java
@@ -115,4 +115,9 @@ public class AsynchronousBlockReader extends AsynchronousFileIOChannel<MemorySeg
 	public LinkedBlockingQueue<MemorySegment> getReturnQueue() {
 		return this.returnSegments;
 	}
-}
+	
+	@Override
+	public void seekToPosition(long position) throws IOException {
+		this.requestQueue.add(new SeekRequest(this, position));
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
index 89ebb25..9a9ee61 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
@@ -72,7 +72,7 @@ public abstract class AsynchronousFileIOChannel<T, R extends IORequest> extends
 	 * @throws IOException Thrown, if the channel could no be opened.
 	 */
 	protected AsynchronousFileIOChannel(FileIOChannel.ID channelID, RequestQueue<R> requestQueue, 
-			RequestDoneCallback<T> callback, boolean writeEnabled) throws IOException
+			RequestDoneCallback callback, boolean writeEnabled) throws IOException
 	{
 		super(channelID, writeEnabled);
 
@@ -113,7 +113,9 @@ public abstract class AsynchronousFileIOChannel<T, R extends IORequest> extends
 						this.closeLock.wait(1000);
 						checkErroneous();
 					}
-					catch (InterruptedException ignored) {}
+					catch (InterruptedException iex) {
+						throw new IOException("Closing of asynchronous file channel was interrupted.");
+					}
 				}
 			}
 			finally {
@@ -181,13 +183,11 @@ public abstract class AsynchronousFileIOChannel<T, R extends IORequest> extends
 			}
 		}
 		finally {
-			// decrement the number of missing buffers. If we are currently closing, notify the 
-			if (this.closed) {
-				synchronized (this.closeLock) {
-					int num = this.requestsNotReturned.decrementAndGet();
-					if (num == 0) {
-						this.closeLock.notifyAll();
-					}
+			// decrement the number of missing buffers. If we are currently closing, notify the waiters
+			synchronized (this.closeLock) {
+				final int num = this.requestsNotReturned.decrementAndGet();
+				if (this.closed && num == 0) {
+					this.closeLock.notifyAll();
 				}
 			}
 			else {

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
index f25827a..8f7f218 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/BlockChannelReader.java
@@ -60,5 +60,12 @@ public interface BlockChannelReader extends FileIOChannel {
 	 * @return The queue with the full memory segments.
 	 */
 	LinkedBlockingQueue<MemorySegment> getReturnQueue();
+	
+	/**
+	 * Seeks the underlying file channel to the given position.
+	 * 
+	 * @param position The position to seek to.
+	 */
+	void seekToPosition(long position) throws IOException;
 }
 	
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
index 7c9d31b..d6f4458 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
@@ -38,6 +38,13 @@ public interface FileIOChannel {
 	FileIOChannel.ID getChannelID();
 	
 	/**
+	 * Gets the size (in bytes) of the file underlying the channel.
+	 * 
+	 * @return The size (in bytes) of the file underlying the channel.
+	 */
+	long getSize() throws IOException;
+	
+	/**
 	 * Checks whether the channel has been closed.
 	 * 
 	 * @return True if the channel has been closed, false otherwise.
@@ -153,4 +160,4 @@ public interface FileIOChannel {
 			return new ID(String.format(FORMAT, this.paths[threadNum], File.separator, namePrefix, (counter++)), threadNum);
 		}
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
index 7de8651..6489396 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
@@ -231,6 +231,18 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 		checkState(!shutdown, "I/O-Manger is closed.");
 		return new AsynchronousBulkBlockReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, targetSegments, numBlocks);
 	}
+	
+	// -------------------------------------------------------------------------
+	//                             For Testing
+	// -------------------------------------------------------------------------
+	
+	RequestQueue<ReadRequest> getReadRequestQueue(FileIOChannel.ID channelID) {
+		return this.readers[channelID.getThreadNum()].requestQueue;
+	}
+	
+	RequestQueue<WriteRequest> getWriteRequestQueue(FileIOChannel.ID channelID) {
+		return this.writers[channelID.getThreadNum()].requestQueue;
+	}
 
 	// -------------------------------------------------------------------------
 	//                           I/O Worker Threads
@@ -446,4 +458,4 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 		}
 		
 	}; // end writer thread
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/MemoryManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/MemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/MemoryManager.java
index 875b223..1ab6931 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/MemoryManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/MemoryManager.java
@@ -119,8 +119,7 @@ public interface MemoryManager {
 	void shutdown();
 	
 	/**
-	 * Checks if the memory manager all memory available and the descriptors of the free segments
-	 * describe a contiguous memory layout.
+	 * Checks if the memory manager all memory available.
 	 * 
 	 * @return True, if the memory manager is empty and valid, false if it is not empty or corrupted.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
new file mode 100644
index 0000000..27928a9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsITCase.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.PairGenerator;
+import org.apache.flink.runtime.operators.testutils.PairGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.PairGenerator.Pair;
+import org.apache.flink.runtime.operators.testutils.PairGenerator.ValueMode;
+
+import java.io.EOFException;
+import java.util.List;
+
+public class FileChannelStreamsITCase {
+	
+	private static final long SEED = 649180756312423613L;
+
+	private static final int KEY_MAX = Integer.MAX_VALUE;
+
+	private static final int VALUE_SHORT_LENGTH = 114;
+	
+	private static final int VALUE_LONG_LENGTH = 112 * 1024;
+
+	private static final int NUM_PAIRS_SHORT = 1000000;
+	
+	private static final int NUM_PAIRS_LONG = 3000;
+	
+	private static final int MEMORY_PAGE_SIZE = 32 * 1024;
+	
+	private static final int NUM_MEMORY_SEGMENTS = 3;
+
+	private IOManager ioManager;
+
+	private MemoryManager memManager;
+
+	// --------------------------------------------------------------------------------------------
+
+	@Before
+	public void beforeTest() {
+		memManager = new DefaultMemoryManager(NUM_MEMORY_SEGMENTS * MEMORY_PAGE_SIZE, 1, MEMORY_PAGE_SIZE);
+		ioManager = new IOManagerAsync();
+	}
+
+	@After
+	public void afterTest() {
+		ioManager.shutdown();
+		assertTrue("I/O Manager was not properly shut down.", ioManager.isProperlyShutDown());
+		assertTrue("The memory has not been properly released", memManager.verifyEmpty());
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	@Test
+	public void testWriteReadSmallRecords() {
+		try {
+			List<MemorySegment> memory = memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);
+			
+			final PairGenerator generator = new PairGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			final FileIOChannel.ID channel = ioManager.createChannel();
+			
+			// create the writer output view
+			final BlockChannelWriter writer = ioManager.createBlockChannelWriter(channel);
+			final FileChannelOutputView outView = new FileChannelOutputView(writer, memManager, memory, MEMORY_PAGE_SIZE);
+			
+			// write a number of pairs
+			Pair pair = new Pair();
+			for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
+				generator.next(pair);
+				pair.write(outView);
+			}
+			outView.close();
+			
+			// create the reader input view
+			List<MemorySegment> readMemory = memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);
+			
+			final BlockChannelReader reader = ioManager.createBlockChannelReader(channel);
+			final FileChannelInputView inView = new FileChannelInputView(reader, memManager, readMemory, outView.getBytesInLatestSegment());
+			generator.reset();
+			
+			// read and re-generate all records and compare them
+			Pair readPair = new Pair();
+			for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
+				generator.next(pair);
+				readPair.read(inView);
+				assertEquals("The re-generated and the read record do not match.", pair, readPair);
+			}
+			
+			inView.close();
+			reader.deleteChannel();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testWriteAndReadLongRecords() {
+		try {
+			final List<MemorySegment> memory = memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);
+			
+			final PairGenerator generator = new PairGenerator(SEED, KEY_MAX, VALUE_LONG_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			final FileIOChannel.ID channel = this.ioManager.createChannel();
+			
+			// create the writer output view
+			final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
+			final FileChannelOutputView outView = new FileChannelOutputView(writer, memManager, memory, MEMORY_PAGE_SIZE);
+			
+			// write a number of pairs
+			Pair pair = new Pair();
+			for (int i = 0; i < NUM_PAIRS_LONG; i++) {
+				generator.next(pair);
+				pair.write(outView);
+			}
+			outView.close();
+			
+			// create the reader input view
+			List<MemorySegment> readMemory = memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);
+			
+			final BlockChannelReader reader = ioManager.createBlockChannelReader(channel);
+			final FileChannelInputView inView = new FileChannelInputView(reader, memManager, readMemory, outView.getBytesInLatestSegment());
+			generator.reset();
+			
+			// read and re-generate all records and compare them
+			Pair readPair = new Pair();
+			for (int i = 0; i < NUM_PAIRS_LONG; i++) {
+				generator.next(pair);
+				readPair.read(inView);
+				assertEquals("The re-generated and the read record do not match.", pair, readPair);
+			}
+			
+			inView.close();
+			reader.deleteChannel();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testReadTooMany() {
+		try {
+			final List<MemorySegment> memory = memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);
+			
+			final PairGenerator generator = new PairGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			final FileIOChannel.ID channel = this.ioManager.createChannel();
+			
+			// create the writer output view
+			final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
+			final FileChannelOutputView outView = new FileChannelOutputView(writer, memManager, memory, MEMORY_PAGE_SIZE);
+	
+			// write a number of pairs
+			Pair pair = new Pair();
+			for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
+				generator.next(pair);
+				pair.write(outView);
+			}
+			outView.close();
+	
+			// create the reader input view
+			List<MemorySegment> readMemory = memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);
+			
+			final BlockChannelReader reader = ioManager.createBlockChannelReader(channel);
+			final FileChannelInputView inView = new FileChannelInputView(reader, memManager, readMemory, outView.getBytesInLatestSegment());
+			generator.reset();
+	
+			// read and re-generate all records and compare them
+			try {
+				Pair readPair = new Pair();
+				for (int i = 0; i < NUM_PAIRS_SHORT + 1; i++) {
+					generator.next(pair);
+					readPair.read(inView);
+					assertEquals("The re-generated and the read record do not match.", pair, readPair);
+				}
+				fail("Expected an EOFException which did not occur.");
+			}
+			catch (EOFException eofex) {
+				// expected
+			}
+			
+			inView.close();
+			reader.deleteChannel();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testWriteReadOneBufferOnly() {
+		try {
+			final List<MemorySegment> memory = memManager.allocatePages(new DummyInvokable(), 1);
+			
+			final PairGenerator generator = new PairGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			final FileIOChannel.ID channel = this.ioManager.createChannel();
+			
+			// create the writer output view
+			final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
+			final FileChannelOutputView outView = new FileChannelOutputView(writer, memManager, memory, MEMORY_PAGE_SIZE);
+			
+			// write a number of pairs
+			Pair pair = new Pair();
+			for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
+				generator.next(pair);
+				pair.write(outView);
+			}
+			outView.close();
+			
+			// create the reader input view
+			List<MemorySegment> readMemory = memManager.allocatePages(new DummyInvokable(), 1);
+			
+			final BlockChannelReader reader = ioManager.createBlockChannelReader(channel);
+			final FileChannelInputView inView = new FileChannelInputView(reader, memManager, readMemory, outView.getBytesInLatestSegment());
+			generator.reset();
+			
+			// read and re-generate all records and compare them
+			Pair readPair = new Pair();
+			for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
+				generator.next(pair);
+				readPair.read(inView);
+				assertEquals("The re-generated and the read record do not match.", pair, readPair);
+			}
+			
+			inView.close();
+			reader.deleteChannel();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testWriteReadNotAll() {
+		try {
+			final List<MemorySegment> memory = memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);
+			
+			final PairGenerator generator = new PairGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+			final FileIOChannel.ID channel = this.ioManager.createChannel();
+			
+			// create the writer output view
+			final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channel);
+			final FileChannelOutputView outView = new FileChannelOutputView(writer, memManager, memory, MEMORY_PAGE_SIZE);
+			
+			// write a number of pairs
+			Pair pair = new Pair();
+			for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
+				generator.next(pair);
+				pair.write(outView);
+			}
+			outView.close();
+			
+			// create the reader input view
+			List<MemorySegment> readMemory = memManager.allocatePages(new DummyInvokable(), NUM_MEMORY_SEGMENTS);
+			
+			final BlockChannelReader reader = ioManager.createBlockChannelReader(channel);
+			final FileChannelInputView inView = new FileChannelInputView(reader, memManager, readMemory, outView.getBytesInLatestSegment());
+			generator.reset();
+			
+			// read and re-generate all records and compare them
+			Pair readPair = new Pair();
+			for (int i = 0; i < NUM_PAIRS_SHORT / 2; i++) {
+				generator.next(pair);
+				readPair.read(inView);
+				assertEquals("The re-generated and the read record do not match.", pair, readPair);
+			}
+			
+			inView.close();
+			reader.deleteChannel();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
new file mode 100644
index 0000000..1db2a6f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/FileChannelStreamsTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.types.StringValue;
+import org.junit.Test;
+
+
+public class FileChannelStreamsTest {
+
+	@Test
+	public void testCloseAndDeleteOutputView() {
+		final IOManager ioManager = new IOManagerAsync();
+		try {
+			MemoryManager memMan = new DefaultMemoryManager(4 * 16*1024, 1, 16*1024);
+			List<MemorySegment> memory = new ArrayList<MemorySegment>();
+			memMan.allocatePages(new DummyInvokable(), memory, 4);
+			
+			FileIOChannel.ID channel = ioManager.createChannel();
+			BlockChannelWriter writer = ioManager.createBlockChannelWriter(channel);
+			
+			FileChannelOutputView out = new FileChannelOutputView(writer, memMan, memory, memMan.getPageSize());
+			new StringValue("Some test text").write(out);
+			
+			// close for the first time, make sure all memory returns
+			out.close();
+			assertTrue(memMan.verifyEmpty());
+			
+			// close again, should not cause an exception
+			out.close();
+			
+			// delete, make sure file is removed
+			out.closeAndDelete();
+			assertFalse(new File(channel.getPath()).exists());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			ioManager.shutdown();
+		}
+	}
+	
+	@Test
+	public void testCloseAndDeleteInputView() {
+		final IOManager ioManager = new IOManagerAsync();
+		try {
+			MemoryManager memMan = new DefaultMemoryManager(4 * 16*1024, 1, 16*1024);
+			List<MemorySegment> memory = new ArrayList<MemorySegment>();
+			memMan.allocatePages(new DummyInvokable(), memory, 4);
+			
+			FileIOChannel.ID channel = ioManager.createChannel();
+			
+			// add some test data
+			{
+				FileWriter wrt = new FileWriter(channel.getPath());
+				wrt.write("test data");
+				wrt.close();
+			}
+			
+			BlockChannelReader reader = ioManager.createBlockChannelReader(channel);
+			FileChannelInputView in = new FileChannelInputView(reader, memMan, memory, 9);
+			
+			// read just something
+			in.readInt();
+			
+			// close for the first time, make sure all memory returns
+			in.close();
+			assertTrue(memMan.verifyEmpty());
+			
+			// close again, should not cause an exception
+			in.close();
+			
+			// delete, make sure file is removed
+			in.closeAndDelete();
+			assertFalse(new File(channel.getPath()).exists());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			ioManager.shutdown();
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
new file mode 100644
index 0000000..7e4d70d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SeekableFileChannelInputViewTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk;
+
+import static org.junit.Assert.*;
+
+import java.io.EOFException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.junit.Test;
+
+
+public class SeekableFileChannelInputViewTest {
+
+	@Test
+	public void testSeek() {
+		final IOManager ioManager = new IOManagerAsync();
+		final int PAGE_SIZE = 16 * 1024;
+		final int NUM_RECORDS = 120000;
+		// integers across 7.x pages (7 pages = 114.688 bytes, 8 pages = 131.072 bytes)
+		
+		try {
+			MemoryManager memMan = new DefaultMemoryManager(4 * PAGE_SIZE, 1, PAGE_SIZE);
+			List<MemorySegment> memory = new ArrayList<MemorySegment>();
+			memMan.allocatePages(new DummyInvokable(), memory, 4);
+			
+			FileIOChannel.ID channel = ioManager.createChannel();
+			BlockChannelWriter writer = ioManager.createBlockChannelWriter(channel);
+			FileChannelOutputView out = new FileChannelOutputView(writer, memMan, memory, memMan.getPageSize());
+			
+			// write some integers across 7.5 pages (7 pages = 114.688 bytes, 8 pages = 131.072 bytes)
+			for (int i = 0; i < NUM_RECORDS; i += 4) {
+				out.writeInt(i);
+			}
+			// close for the first time, make sure all memory returns
+			out.close();
+			assertTrue(memMan.verifyEmpty());
+			
+			memMan.allocatePages(new DummyInvokable(), memory, 4);
+			SeekableFileChannelInputView in = new SeekableFileChannelInputView(ioManager, channel, memMan, memory, out.getBytesInLatestSegment());
+			
+			// read first, complete
+			for (int i = 0; i < NUM_RECORDS; i += 4) {
+				assertEquals(i, in.readInt());
+			}
+			try {
+				in.readInt();
+				fail("should throw EOF exception");
+			} catch (EOFException e) {}
+			
+			// seek to the middle of the 3rd page
+			int i = 2 * PAGE_SIZE + PAGE_SIZE / 4;
+			in.seek(i);
+			for (; i < NUM_RECORDS; i += 4) {
+				assertEquals(i, in.readInt());
+			}
+			try {
+				in.readInt();
+				fail("should throw EOF exception");
+			} catch (EOFException e) {}
+			
+			// seek to the end
+			i = 120000 - 4;
+			in.seek(i);
+			for (; i < NUM_RECORDS; i += 4) {
+				assertEquals(i, in.readInt());
+			}
+			try {
+				in.readInt();
+				fail("should throw EOF exception");
+			} catch (EOFException e) {}
+			
+			// seek to the beginning
+			i = 0;
+			in.seek(i);
+			for (; i < NUM_RECORDS; i += 4) {
+				assertEquals(i, in.readInt());
+			}
+			try {
+				in.readInt();
+				fail("should throw EOF exception");
+			} catch (EOFException e) {}
+			
+			// seek to after a page
+			i = PAGE_SIZE;
+			in.seek(i);
+			for (; i < NUM_RECORDS; i += 4) {
+				assertEquals(i, in.readInt());
+			}
+			try {
+				in.readInt();
+				fail("should throw EOF exception");
+			} catch (EOFException e) {}
+			
+			// seek to after a page
+			i = 3 * PAGE_SIZE;
+			in.seek(i);
+			for (; i < NUM_RECORDS; i += 4) {
+				assertEquals(i, in.readInt());
+			}
+			try {
+				in.readInt();
+				fail("should throw EOF exception");
+			} catch (EOFException e) {}
+			
+			// seek to the end
+			i = NUM_RECORDS;
+			in.seek(i);
+			try {
+				in.readInt();
+				fail("should throw EOF exception");
+			} catch (EOFException e) {}
+			
+			// seek out of bounds
+			try {
+				in.seek(-10);
+				fail("should throw an exception");
+			} catch (IllegalArgumentException e) {}
+			try {
+				in.seek(NUM_RECORDS + 1);
+				fail("should throw an exception");
+			} catch (IllegalArgumentException e) {}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			ioManager.shutdown();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelsTest.java
new file mode 100644
index 0000000..1e9d4d4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelsTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.junit.Test;
+
+public class AsynchronousFileIOChannelsTest {
+
+	@Test
+	public void testClosingWaits() {
+		IOManagerAsync ioMan = new IOManagerAsync();
+		try {
+			
+			final int NUM_BLOCKS = 100;
+			final MemorySegment seg = new MemorySegment(new byte[32 * 1024]);
+			
+			final AtomicInteger callbackCounter = new AtomicInteger();
+			final AtomicBoolean exceptionOccurred = new AtomicBoolean();
+			
+			final RequestDoneCallback callback = new RequestDoneCallback() {
+				
+				@Override
+				public void requestSuccessful(MemorySegment buffer) {
+					// we do the non safe variant. the callbacks should come in order from
+					// the same thread, so it should always work
+					callbackCounter.set(callbackCounter.get() + 1);
+					
+					if (buffer != seg) {
+						exceptionOccurred.set(true);
+					}
+				}
+				
+				@Override
+				public void requestFailed(MemorySegment buffer, IOException e) {
+					exceptionOccurred.set(true);
+				}
+			};
+			
+			BlockChannelWriterWithCallback writer = ioMan.createBlockChannelWriter(ioMan.createChannel(), callback);
+			try {
+				for (int i = 0; i < NUM_BLOCKS; i++) {
+					writer.writeBlock(seg);
+				}
+				
+				writer.close();
+				
+				assertEquals(NUM_BLOCKS, callbackCounter.get());
+				assertFalse(exceptionOccurred.get());
+			}
+			finally {
+				writer.closeAndDelete();
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			ioMan.shutdown();
+		}
+	}
+	
+	@Test
+	public void testExceptionForwardsToClose() {
+		IOManagerAsync ioMan = new IOManagerAsync();
+		try {
+			testExceptionForwardsToClose(ioMan, 100, 1);
+			testExceptionForwardsToClose(ioMan, 100, 50);
+			testExceptionForwardsToClose(ioMan, 100, 100);
+		}
+		finally {
+			ioMan.shutdown();
+		}
+	}
+	
+	private void testExceptionForwardsToClose(IOManagerAsync ioMan, final int numBlocks, final int failingBlock) {
+		try {
+			MemorySegment seg = new MemorySegment(new byte[32 * 1024]);
+			FileIOChannel.ID channelId = ioMan.createChannel();
+			
+			BlockChannelWriterWithCallback writer = new AsynchronousBlockWriterWithCallback(channelId, 
+					ioMan.getWriteRequestQueue(channelId), new NoOpCallback()) {
+				
+				private int numBlocks;
+				
+				@Override
+				public void writeBlock(MemorySegment segment) throws IOException {
+					numBlocks++;
+					
+					if (numBlocks == failingBlock) {
+						this.requestsNotReturned.incrementAndGet();
+						this.requestQueue.add(new FailingWriteRequest(this, segment));
+					} else {
+						super.writeBlock(segment);
+					}
+				}
+			};
+			
+			try {
+				for (int i = 0; i < numBlocks; i++) {
+					writer.writeBlock(seg);
+				}
+				
+				writer.close();
+				fail("did not forward exception");
+			}
+			catch (IOException e) {
+				// expected
+			}
+			finally {
+				try {
+					writer.closeAndDelete();
+				} catch (Throwable t) {}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	private static class NoOpCallback implements RequestDoneCallback {
+
+		@Override
+		public void requestSuccessful(MemorySegment buffer) {}
+
+		@Override
+		public void requestFailed(MemorySegment buffer, IOException e) {}
+	}
+	
+	private static class FailingWriteRequest implements WriteRequest {
+		
+		private final AsynchronousFileIOChannel<WriteRequest> channel;
+		
+		private final MemorySegment segment;
+		
+		protected FailingWriteRequest(AsynchronousFileIOChannel<WriteRequest> targetChannel, MemorySegment segment) {
+			this.channel = targetChannel;
+			this.segment = segment;
+		}
+
+		@Override
+		public void write() throws IOException {
+			throw new IOException();
+		}
+
+		@Override
+		public void requestDone(IOException ioex) {
+			this.channel.handleProcessedBuffer(this.segment, ioex);
+		}
+	} 
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java
new file mode 100644
index 0000000..297eeed
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.disk.iomanager;
+
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
+import org.apache.flink.runtime.io.disk.iomanager.ReadRequest;
+import org.apache.flink.runtime.io.disk.iomanager.WriteRequest;
+
+public class IOManagerAsyncTest {
+	
+	private IOManagerAsync ioManager;
+	
+	// ------------------------------------------------------------------------
+	//                           Setup & Shutdown
+	// ------------------------------------------------------------------------
+	
+	@Before
+	public void beforeTest() {
+		ioManager = new IOManagerAsync();
+	}
+
+	@After
+	public void afterTest() {
+		this.ioManager.shutdown();
+		assertTrue("IO Manager has not properly shut down.", ioManager.isProperlyShutDown());
+	}
+
+	// ------------------------------------------------------------------------
+	//                           Test Methods
+	// ------------------------------------------------------------------------
+	
+	@Test
+	public void channelReadWriteOneSegment() {
+		final int NUM_IOS = 1111;
+		
+		try {
+			final FileIOChannel.ID channelID = this.ioManager.createChannel();
+			final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channelID);
+			
+			MemorySegment memSeg = new MemorySegment(new byte[32 * 1024]);
+			
+			for (int i = 0; i < NUM_IOS; i++) {
+				for (int pos = 0; pos < memSeg.size(); pos += 4) {
+					memSeg.putInt(pos, i);
+				}
+				
+				writer.writeBlock(memSeg);
+				memSeg = writer.getNextReturnedSegment();
+			}
+			
+			writer.close();
+			
+			final BlockChannelReader reader = this.ioManager.createBlockChannelReader(channelID);
+			for (int i = 0; i < NUM_IOS; i++) {
+				reader.readBlock(memSeg);
+				memSeg = reader.getNextReturnedSegment();
+				
+				for (int pos = 0; pos < memSeg.size(); pos += 4) {
+					if (memSeg.getInt(pos) != i) {
+						fail("Read memory segment contains invalid data.");
+					}
+				}
+			}
+			
+			reader.closeAndDelete();
+		}
+		catch (Exception ex) {
+			ex.printStackTrace();
+			fail("Test encountered an exception: " + ex.getMessage());
+		}
+	}
+	
+	@Test
+	public void channelReadWriteMultipleSegments() {
+		final int NUM_IOS = 1111;
+		final int NUM_SEGS = 16;
+		
+		try {
+			final List<MemorySegment> memSegs = new ArrayList<MemorySegment>();
+			for (int i = 0; i < NUM_SEGS; i++) {
+				memSegs.add(new MemorySegment(new byte[32 * 1024]));
+			}
+			
+			final FileIOChannel.ID channelID = this.ioManager.createChannel();
+			final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channelID);
+			
+			for (int i = 0; i < NUM_IOS; i++) {
+				final MemorySegment memSeg = memSegs.isEmpty() ? writer.getNextReturnedSegment() : memSegs.remove(memSegs.size() - 1);
+				
+				for (int pos = 0; pos < memSeg.size(); pos += 4) {
+					memSeg.putInt(pos, i);
+				}
+				
+				writer.writeBlock(memSeg);
+			}
+			writer.close();
+			
+			// get back the memory
+			while (memSegs.size() < NUM_SEGS) {
+				memSegs.add(writer.getNextReturnedSegment());
+			}
+			
+			final BlockChannelReader reader = this.ioManager.createBlockChannelReader(channelID);
+			while(!memSegs.isEmpty()) {
+				reader.readBlock(memSegs.remove(0));
+			}
+			
+			for (int i = 0; i < NUM_IOS; i++) {
+				final MemorySegment memSeg = reader.getNextReturnedSegment();
+				
+				for (int pos = 0; pos < memSeg.size(); pos += 4) {
+					if (memSeg.getInt(pos) != i) {
+						fail("Read memory segment contains invalid data.");
+					}
+				}
+				reader.readBlock(memSeg);
+			}
+			
+			reader.closeAndDelete();
+			
+			// get back the memory
+			while (memSegs.size() < NUM_SEGS) {
+				memSegs.add(reader.getNextReturnedSegment());
+			}
+		}
+		catch (Exception ex) {
+			ex.printStackTrace();
+			fail("TEst encountered an exception: " + ex.getMessage());
+		}
+	}
+	
+	@Test
+	public void testExceptionPropagationReader() {
+		try {
+			// use atomic boolean as a boolean reference
+			final AtomicBoolean handlerCalled = new AtomicBoolean();
+			final AtomicBoolean exceptionForwarded = new AtomicBoolean();
+			
+			ReadRequest req = new ReadRequest() {
+				
+				@Override
+				public void requestDone(IOException ioex) {
+					if (ioex instanceof TestIOException) {
+						exceptionForwarded.set(true);
+					}
+					
+					synchronized (handlerCalled) {
+						handlerCalled.set(true);
+						handlerCalled.notifyAll();
+					}
+				}
+				
+				@Override
+				public void read() throws IOException {
+					throw new TestIOException();
+				}
+			};
+			
+			
+			// test the read queue
+			RequestQueue<ReadRequest> rq = ioManager.getReadRequestQueue(ioManager.createChannel());
+			rq.add(req);
+
+			// wait until the asynchronous request has been handled
+			synchronized (handlerCalled) {
+				while (!handlerCalled.get()) {
+					handlerCalled.wait();
+				}
+			}
+			
+			assertTrue(exceptionForwarded.get());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testExceptionPropagationWriter() {
+		try {
+			// use atomic boolean as a boolean reference
+			final AtomicBoolean handlerCalled = new AtomicBoolean();
+			final AtomicBoolean exceptionForwarded = new AtomicBoolean();
+			
+			WriteRequest req = new WriteRequest() {
+				
+				@Override
+				public void requestDone(IOException ioex) {
+					if (ioex instanceof TestIOException) {
+						exceptionForwarded.set(true);
+					}
+					
+					synchronized (handlerCalled) {
+						handlerCalled.set(true);
+						handlerCalled.notifyAll();
+					}
+				}
+				
+				@Override
+				public void write() throws IOException {
+					throw new TestIOException();
+				}
+			};
+			
+			
+			// test the read queue
+			RequestQueue<WriteRequest> rq = ioManager.getWriteRequestQueue(ioManager.createChannel());
+			rq.add(req);
+
+			// wait until the asynchronous request has been handled
+			synchronized (handlerCalled) {
+				while (!handlerCalled.get()) {
+					handlerCalled.wait();
+				}
+			}
+			
+			assertTrue(exceptionForwarded.get());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testExceptionInCallbackRead() {
+		try {
+			final AtomicBoolean handlerCalled = new AtomicBoolean();
+			
+			ReadRequest regularRequest = new ReadRequest() {
+				
+				@Override
+				public void requestDone(IOException ioex) {
+					synchronized (handlerCalled) {
+						handlerCalled.set(true);
+						handlerCalled.notifyAll();
+					}
+				}
+				
+				@Override
+				public void read() {}
+			};
+			
+			ReadRequest exceptionThrower = new ReadRequest() {
+				
+				@Override
+				public void requestDone(IOException ioex) {
+					throw new RuntimeException();
+				}
+				
+				@Override
+				public void read() {}
+			};
+			
+			RequestQueue<ReadRequest> rq = ioManager.getReadRequestQueue(ioManager.createChannel());
+			
+			// queue first an exception thrower, then a regular request.
+			// we check that the regular request gets successfully handled
+			rq.add(exceptionThrower);
+			rq.add(regularRequest);
+			
+			synchronized (handlerCalled) {
+				while (!handlerCalled.get()) {
+					handlerCalled.wait();
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testExceptionInCallbackWrite() {
+		try {
+			final AtomicBoolean handlerCalled = new AtomicBoolean();
+			
+			WriteRequest regularRequest = new WriteRequest() {
+				
+				@Override
+				public void requestDone(IOException ioex) {
+					synchronized (handlerCalled) {
+						handlerCalled.set(true);
+						handlerCalled.notifyAll();
+					}
+				}
+				
+				@Override
+				public void write() {}
+			};
+			
+			WriteRequest exceptionThrower = new WriteRequest() {
+				
+				@Override
+				public void requestDone(IOException ioex) {
+					throw new RuntimeException();
+				}
+				
+				@Override
+				public void write() {}
+			};
+			
+			RequestQueue<WriteRequest> rq = ioManager.getWriteRequestQueue(ioManager.createChannel());
+			
+			// queue first an exception thrower, then a regular request.
+			// we check that the regular request gets successfully handled
+			rq.add(exceptionThrower);
+			rq.add(regularRequest);
+			
+			synchronized (handlerCalled) {
+				while (!handlerCalled.get()) {
+					handlerCalled.wait();
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	
+	
+	final class TestIOException extends IOException {
+		private static final long serialVersionUID = -814705441998024472L;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
index 78951d3..f1d5337 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
@@ -25,9 +25,9 @@ import java.util.List;
 import java.util.Random;
 
 import org.junit.Assert;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
@@ -36,22 +36,17 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.DefaultMemoryManagerTest;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
 
 /**
  * Integration test case for the I/O manager.
  */
 public class IOManagerITCase {
 	
-	private static final Logger LOG = LoggerFactory.getLogger(IOManagerITCase.class);
-	
 	private static final long SEED = 649180756312423613L;
 
-	private static final int NUMBER_OF_SEGMENTS = 10; // 10
-
-	private static final int SEGMENT_SIZE = 1024 * 1024; // 1M
+	private static final int MAXIMUM_NUMBER_OF_SEGMENTS_PER_CHANNEL = 10;
+	
+	private static final int MEMORY_SIZE = 10 * 1024 * 1024; // 10 MB
 	
 	private final int NUM_CHANNELS = 29;
 	
@@ -63,7 +58,7 @@ public class IOManagerITCase {
 
 	@Before
 	public void beforeTest() {
-		memoryManager = new DefaultMemoryManager(NUMBER_OF_SEGMENTS * SEGMENT_SIZE, 1);
+		memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
 		ioManager = new IOManagerAsync();
 	}
 
@@ -84,10 +79,7 @@ public class IOManagerITCase {
 	 * parallel. It is designed to check the ability of the IO manager to correctly handle multiple threads.
 	 */
 	@Test
-	public void parallelChannelsTest() throws Exception
-	{
-		LOG.info("Starting parallel channels test.");
-		
+	public void parallelChannelsTest() throws Exception {
 		final Random rnd = new Random(SEED);
 		final AbstractInvokable memOwner = new DefaultMemoryManagerTest.DummyInvokable();
 		
@@ -106,7 +98,7 @@ public class IOManagerITCase {
 			ids[i] = this.ioManager.createChannel();
 			writers[i] = this.ioManager.createBlockChannelWriter(ids[i]);
 			
-			List<MemorySegment> memSegs = this.memoryManager.allocatePages(memOwner, rnd.nextInt(NUMBER_OF_SEGMENTS - 2) + 2);
+			List<MemorySegment> memSegs = this.memoryManager.allocatePages(memOwner, rnd.nextInt(MAXIMUM_NUMBER_OF_SEGMENTS_PER_CHANNEL - 1) + 1);
 			outs[i] = new ChannelWriterOutputView(writers[i], memSegs, this.memoryManager.getPageSize());
 		}
 		
@@ -114,24 +106,13 @@ public class IOManagerITCase {
 		Value val = new Value();
 		
 		// write a lot of values unevenly distributed over the channels
-		int nextLogCount = 0;
-		float nextLogFraction = 0.0f;
 		
-		LOG.info("Writing to channels...");
 		for (int i = 0; i < NUMBERS_TO_BE_WRITTEN; i++) {
-			
-			if (i == nextLogCount) {
-				LOG.info("... " + (int) (nextLogFraction * 100) + "% done.");
-				nextLogFraction += 0.05;
-				nextLogCount = (int) (nextLogFraction * NUMBERS_TO_BE_WRITTEN);
-			}
-			
 			int channel = skewedSample(rnd, NUM_CHANNELS - 1);
 			
 			val.value = String.valueOf(writingCounters[channel]++);
 			val.write(outs[channel]);
 		}
-		LOG.info("Writing done, flushing contents...");
 		
 		// close all writers
 		for (int i = 0; i < NUM_CHANNELS; i++) {
@@ -141,12 +122,9 @@ public class IOManagerITCase {
 		writers = null;
 		
 		// instantiate the readers for sequential read
-		LOG.info("Reading channels sequentially...");
-		for (int i = 0; i < NUM_CHANNELS; i++)
-		{
-			List<MemorySegment> memSegs = this.memoryManager.allocatePages(memOwner, rnd.nextInt(NUMBER_OF_SEGMENTS - 2) + 2);
+		for (int i = 0; i < NUM_CHANNELS; i++) {
 			
-			LOG.info("Reading channel " + (i+1) + "/" + NUM_CHANNELS + '.');
+			List<MemorySegment> memSegs = this.memoryManager.allocatePages(memOwner, rnd.nextInt(MAXIMUM_NUMBER_OF_SEGMENTS_PER_CHANNEL - 1) + 1);
 				
 			final BlockChannelReader reader = this.ioManager.createBlockChannelReader(ids[i]);
 			final ChannelReaderInputView in = new ChannelReaderInputView(reader, memSegs, false);
@@ -173,30 +151,19 @@ public class IOManagerITCase {
 			
 			this.memoryManager.release(in.close());
 		}
-		LOG.info("Sequential reading done.");
 		
 		// instantiate the readers
-		LOG.info("Reading channels randomly...");
 		for (int i = 0; i < NUM_CHANNELS; i++) {
 			
-			List<MemorySegment> memSegs = this.memoryManager.allocatePages(memOwner, rnd.nextInt(NUMBER_OF_SEGMENTS - 2) + 2);
+			List<MemorySegment> memSegs = this.memoryManager.allocatePages(memOwner, rnd.nextInt(MAXIMUM_NUMBER_OF_SEGMENTS_PER_CHANNEL - 1) + 1);
 				
 			readers[i] = this.ioManager.createBlockChannelReader(ids[i]);
 			ins[i] = new ChannelReaderInputView(readers[i], memSegs, false);
 		}
 		
-		nextLogCount = 0;
-		nextLogFraction = 0.0f;
-		
 		// read a lot of values in a mixed order from the channels
 		for (int i = 0; i < NUMBERS_TO_BE_WRITTEN; i++) {
 			
-			if (i == nextLogCount) {
-				LOG.info("... " + (int) (nextLogFraction * 100) + "% done.");
-				nextLogFraction += 0.05;
-				nextLogCount = (int) (nextLogFraction * NUMBERS_TO_BE_WRITTEN);
-			}
-			
 			while (true) {
 				final int channel = skewedSample(rnd, NUM_CHANNELS - 1);
 				if (ins[channel] != null) {
@@ -222,7 +189,6 @@ public class IOManagerITCase {
 			}
 			
 		}
-		LOG.info("Random reading done.");
 		
 		// close all readers
 		for (int i = 0; i < NUM_CHANNELS; i++) {
@@ -256,10 +222,9 @@ public class IOManagerITCase {
 	
 	protected static class Value implements IOReadableWritable {
 
-		String value;
+		private String value;
 
-		public Value() {
-		}
+		public Value() {}
 
 		public Value(String val) {
 			this.value = val;
@@ -306,5 +271,4 @@ public class IOManagerITCase {
 			return true;
 		}
 	}
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
index fa6cb80..ab5c206 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
@@ -16,233 +16,96 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.disk.iomanager;
 
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.memory.DefaultMemoryManagerTest.DummyInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
 
-public class IOManagerTest {
-	
-	// ------------------------------------------------------------------------
-	//                        Cross Test Fields
-	// ------------------------------------------------------------------------
-	
-	private IOManager ioManager;
-
-	private DefaultMemoryManager memoryManager;
-	
-	// ------------------------------------------------------------------------
-	//                           Setup & Shutdown
-	// ------------------------------------------------------------------------
-	
-	@Before
-	public void beforeTest() {
-		this.memoryManager = new DefaultMemoryManager(32 * 1024 * 1024, 1);
-		this.ioManager = new IOManagerAsync();
-	}
-
-	@After
-	public void afterTest() {
-		this.ioManager.shutdown();
-		Assert.assertTrue("IO Manager has not properly shut down.", ioManager.isProperlyShutDown());
-		
-		Assert.assertTrue("Not all memory was returned to the memory manager in the test.", this.memoryManager.verifyEmpty());
-		this.memoryManager.shutdown();
-		this.memoryManager = null;
-	}
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel.ID;
+import org.junit.Test;
 
-	// ------------------------------------------------------------------------
-	//                           Test Methods
-	// ------------------------------------------------------------------------
-	
-	// ------------------------------------------------------------------------
+public class IOManagerTest {
 
-	/**
-	 * Tests that the channel enumerator creates channels in the temporary files directory.
-	 */
 	@Test
 	public void channelEnumerator() {
-		File tempPath = new File(System.getProperty("java.io.tmpdir")); 
+		File tempPath = new File(System.getProperty("java.io.tmpdir"));
 		
-		FileIOChannel.Enumerator enumerator = ioManager.createChannelEnumerator();
+		String[] tempDirs = new String[] {
+			new File(tempPath, "a").getAbsolutePath(),
+			new File(tempPath, "b").getAbsolutePath(),
+			new File(tempPath, "c").getAbsolutePath(),
+			new File(tempPath, "d").getAbsolutePath(),
+			new File(tempPath, "e").getAbsolutePath(),
+		};
+		
+		int[] counters = new int[tempDirs.length];
+		
+		
+		FileIOChannel.Enumerator enumerator = new TestIOManager(tempDirs).createChannelEnumerator();
 
-		for (int i = 0; i < 10; i++) {
+		for (int i = 0; i < 3 * tempDirs.length; i++) {
 			FileIOChannel.ID id = enumerator.next();
 			
 			File path = new File(id.getPath());
-			Assert.assertTrue("Channel IDs must name an absolute path.", path.isAbsolute());
-			Assert.assertFalse("Channel IDs must name a file, not a directory.", path.isDirectory());
-			Assert.assertTrue("Path is not in the temp directory.", tempPath.equals(path.getParentFile()));
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	
-	@Test
-	public void channelReadWriteOneSegment() {
-		final int NUM_IOS = 1111;
-		
-		try {
-			final FileIOChannel.ID channelID = this.ioManager.createChannel();
-			final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channelID);
 			
-			MemorySegment memSeg = this.memoryManager.allocatePages(new DummyInvokable(), 1).get(0);
+			assertTrue("Channel IDs must name an absolute path.", path.isAbsolute());
 			
-			for (int i = 0; i < NUM_IOS; i++) {
-				for (int pos = 0; pos < memSeg.size(); pos += 4) {
-					memSeg.putInt(pos, i);
-				}
-				
-				writer.writeBlock(memSeg);
-				memSeg = writer.getNextReturnedSegment();
-			}
+			assertFalse("Channel IDs must name a file, not a directory.", path.isDirectory());
 			
-			writer.close();
+			assertTrue("Path is not in the temp directory.", tempPath.equals(path.getParentFile().getParentFile()));
 			
-			final BlockChannelReader reader = this.ioManager.createBlockChannelReader(channelID);
-			for (int i = 0; i < NUM_IOS; i++) {
-				reader.readBlock(memSeg);
-				memSeg = reader.getNextReturnedSegment();
-				
-				for (int pos = 0; pos < memSeg.size(); pos += 4) {
-					if (memSeg.getInt(pos) != i) {
-						Assert.fail("Read memory segment contains invalid data.");
-					}
+			for (int k = 0; k < tempDirs.length; k++) {
+				if (path.getParent().equals(tempDirs[k])) {
+					counters[k]++;
 				}
 			}
-			
-			reader.closeAndDelete();
-			
-			this.memoryManager.release(memSeg);
-			
-		} catch (Exception ex) {
-			ex.printStackTrace();
-			Assert.fail("TEst encountered an exception: " + ex.getMessage());
 		}
-	}
-	
-	@Test
-	public void channelReadWriteMultipleSegments() {
-		final int NUM_IOS = 1111;
-		final int NUM_SEGS = 16;
 		
-		try {
-			final List<MemorySegment> memSegs = this.memoryManager.allocatePages(new DummyInvokable(), NUM_SEGS);
-			final FileIOChannel.ID channelID = this.ioManager.createChannel();
-			final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channelID);
-			
-			for (int i = 0; i < NUM_IOS; i++) {
-				final MemorySegment memSeg = memSegs.isEmpty() ? writer.getNextReturnedSegment() : memSegs.remove(0);
-				
-				for (int pos = 0; pos < memSeg.size(); pos += 4) {
-					memSeg.putInt(pos, i);
-				}
-				
-				writer.writeBlock(memSeg);
-			}
-			writer.close();
-			
-			// get back the memory
-			while (memSegs.size() < NUM_SEGS) {
-				memSegs.add(writer.getNextReturnedSegment());
-			}
-			
-			final BlockChannelReader reader = this.ioManager.createBlockChannelReader(channelID);
-			while(!memSegs.isEmpty()) {
-				reader.readBlock(memSegs.remove(0));
-			}
-			
-			for (int i = 0; i < NUM_IOS; i++) {
-				final MemorySegment memSeg = reader.getNextReturnedSegment();
-				
-				for (int pos = 0; pos < memSeg.size(); pos += 4) {
-					if (memSeg.getInt(pos) != i) {
-						Assert.fail("Read memory segment contains invalid data.");
-					}
-				}
-				reader.readBlock(memSeg);
-			}
-			
-			reader.closeAndDelete();
-			
-			// get back the memory
-			while (memSegs.size() < NUM_SEGS) {
-				memSegs.add(reader.getNextReturnedSegment());
-			}
-			
-			this.memoryManager.release(memSegs);
-			
-		} catch (Exception ex) {
-			ex.printStackTrace();
-			Assert.fail("TEst encountered an exception: " + ex.getMessage());
+		for (int k = 0; k < tempDirs.length; k++) {
+			assertEquals(3, counters[k]);
 		}
 	}
-
-	// ============================================================================================
 	
-	final class FailingSegmentReadRequest implements ReadRequest {
-		
-		private final AsynchronousFileIOChannel<MemorySegment, ReadRequest> channel;
-		
-		private final MemorySegment segment;
-		
-		protected FailingSegmentReadRequest(AsynchronousFileIOChannel<MemorySegment, ReadRequest> targetChannel, MemorySegment segment) {
-			this.channel = targetChannel;
-			this.segment = segment;
+	// --------------------------------------------------------------------------------------------
+	
+	private static class TestIOManager extends IOManager {
+
+		protected TestIOManager(String[] paths) {
+			super(paths);
 		}
 
+		@Override
+		public void shutdown() {}
 
 		@Override
-		public void read() throws IOException {
-			throw new TestIOException();
+		public boolean isProperlyShutDown() {
+			return false;
 		}
 
-
 		@Override
-		public void requestDone(IOException ioex) {
-			this.channel.handleProcessedBuffer(this.segment, ioex);
+		public BlockChannelWriter createBlockChannelWriter(ID channelID, LinkedBlockingQueue<MemorySegment> returnQueue) {
+			throw new UnsupportedOperationException();
 		}
-	}
 
-	//--------------------------------------------------------------------------------------------
-
-	/**
-	 * Special write request that writes an entire memory segment to the block writer.
-	 */
-	final class FailingSegmentWriteRequest implements WriteRequest {
-		
-		private final AsynchronousFileIOChannel<MemorySegment, WriteRequest> channel;
-		
-		private final MemorySegment segment;
-		
-		protected FailingSegmentWriteRequest(AsynchronousFileIOChannel<MemorySegment, WriteRequest> targetChannel, MemorySegment segment) {
-			this.channel = targetChannel;
-			this.segment = segment;
+		@Override
+		public BlockChannelWriterWithCallback createBlockChannelWriter(ID channelID, RequestDoneCallback callback) {
+			throw new UnsupportedOperationException();
 		}
 
 		@Override
-		public void write() throws IOException {
-			throw new TestIOException();
+		public BlockChannelReader createBlockChannelReader(ID channelID, LinkedBlockingQueue<MemorySegment> returnQueue) {
+			throw new UnsupportedOperationException();
 		}
 
 		@Override
-		public void requestDone(IOException ioex) {
-			this.channel.handleProcessedBuffer(this.segment, ioex);
+		public BulkBlockChannelReader createBulkBlockChannelReader(ID channelID, List<MemorySegment> targetSegments, int numBlocks) {
+			throw new UnsupportedOperationException();
 		}
 	}
-	
-	
-	final class TestIOException extends IOException {
-		private static final long serialVersionUID = -814705441998024472L;
-	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/996d404c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/PairGenerator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/PairGenerator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/PairGenerator.java
new file mode 100644
index 0000000..951a661
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/PairGenerator.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.types.Value;
+
+public final class PairGenerator {
+
+	public static class Pair implements Value {
+		
+		private static final long serialVersionUID = 1L;
+		
+		private int key;
+		private StringValue value = new StringValue();
+		
+		
+		public Pair() {}
+		
+		
+		public int getKey() {
+			return key;
+		}
+		
+		public StringValue getValue() {
+			return value;
+		}
+		
+		@Override
+		public void write(DataOutputView out) throws IOException {
+			out.writeInt(key);
+			value.write(out);
+		}
+		
+		@Override
+		public void read(DataInputView in) throws IOException {
+			key = in.readInt();
+			value.read(in);
+		}
+		
+		@Override
+		public int hashCode() {
+			return 31 * key + value.hashCode();
+		}
+		
+		@Override
+		public boolean equals(Object obj) {
+			if (obj instanceof Pair) {
+				Pair other = (Pair) obj;
+				return other.key == this.key && other.value.equals(this.value);
+			} else {
+				return false;
+			}
+		}
+		
+		@Override
+		public String toString() {
+			return String.format("(%d, %s)", key, value);
+		}
+	}
+	
+	public enum KeyMode {
+		SORTED, RANDOM
+	};
+
+	public enum ValueMode {
+		FIX_LENGTH, RANDOM_LENGTH, CONSTANT
+	};
+
+	private static char[] alpha = { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'a', 'b', 'c',
+		'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm' };
+
+	private final long seed;
+
+	private final int keyMax;
+
+	private final int valueLength;
+
+	private final KeyMode keyMode;
+
+	private final ValueMode valueMode;
+
+	private Random random;
+
+	private int counter;
+
+	private final StringValue valueConstant;
+
+	
+	public PairGenerator(long seed, int keyMax, int valueLength) {
+		this(seed, keyMax, valueLength, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
+	}
+
+	public PairGenerator(long seed, int keyMax, int valueLength, KeyMode keyMode, ValueMode valueMode) {
+		this(seed, keyMax, valueLength, keyMode, valueMode, null);
+	}
+	
+	public PairGenerator(long seed, int keyMax, int valueLength, KeyMode keyMode, ValueMode valueMode, String constant) {
+		this.seed = seed;
+		this.keyMax = keyMax;
+		this.valueLength = valueLength;
+		this.keyMode = keyMode;
+		this.valueMode = valueMode;
+
+		this.random = new Random(seed);
+		this.counter = 0;
+		
+		this.valueConstant = new StringValue();
+		if (constant != null) {
+			this.valueConstant.setValue(constant);
+		}
+	}
+
+	public void next(Pair target) {
+		target.key = (keyMode == KeyMode.SORTED ? ++counter : Math.abs(random.nextInt() % keyMax) + 1);
+		
+		if (valueMode == ValueMode.CONSTANT) {
+			target.value = valueConstant;
+		} else {
+			randomString(target.value);
+		}
+	}
+
+	public void reset() {
+		this.random = new Random(seed);
+		this.counter = 0;
+	}
+
+	private void randomString(StringValue target) {
+		
+		int length = valueMode == ValueMode.FIX_LENGTH ?
+			valueLength :
+			valueLength - random.nextInt(valueLength / 3);
+
+		target.setLength(0);
+		for (int i = 0; i < length; i++) {
+			target.append(alpha[random.nextInt(alpha.length)]);
+		}
+	}
+}


Mime
View raw message