flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [1/3] incubator-flink git commit: [FLINK-1323] Refactor I/O Manager Readers and Writers to interfaces, add implementation that uses callbacks on completed write requests.
Date Tue, 11 Nov 2014 10:48:48 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master 8e4c772ad -> c9cfe3ba9


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
index d5c5760..78951d3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerITCase.java
@@ -64,7 +64,7 @@ public class IOManagerITCase {
 	@Before
 	public void beforeTest() {
 		memoryManager = new DefaultMemoryManager(NUMBER_OF_SEGMENTS * SEGMENT_SIZE, 1);
-		ioManager = new IOManager();
+		ioManager = new IOManagerAsync();
 	}
 
 	@After
@@ -91,7 +91,7 @@ public class IOManagerITCase {
 		final Random rnd = new Random(SEED);
 		final AbstractInvokable memOwner = new DefaultMemoryManagerTest.DummyInvokable();
 		
-		Channel.ID[] ids = new Channel.ID[NUM_CHANNELS];
+		FileIOChannel.ID[] ids = new FileIOChannel.ID[NUM_CHANNELS];
 		BlockChannelWriter[] writers = new BlockChannelWriter[NUM_CHANNELS];
 		BlockChannelReader[] readers = new BlockChannelReader[NUM_CHANNELS];
 		ChannelWriterOutputView[] outs = new ChannelWriterOutputView[NUM_CHANNELS];

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
index ed3a28a..9c129e6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerPerformanceBenchmark.java
@@ -41,7 +41,7 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
 import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -57,8 +57,8 @@ import org.junit.Test;
  *
  *
  */
-public class IOManagerPerformanceBenchmark
-{
+public class IOManagerPerformanceBenchmark {
+	
 	private static final Logger LOG = LoggerFactory.getLogger(IOManagerPerformanceBenchmark.class);
 	
 	private static final int[] SEGMENT_SIZES_ALIGNED = { 4096, 16384, 524288 };
@@ -80,10 +80,9 @@ public class IOManagerPerformanceBenchmark
 	
 	
 	@Before
-	public void startup()
-	{
+	public void startup() {
 		memManager = new DefaultMemoryManager(MEMORY_SIZE,1);
-		ioManager = new IOManager();
+		ioManager = new IOManagerAsync();
 	}
 	
 	@After
@@ -111,7 +110,7 @@ public class IOManagerPerformanceBenchmark
 	private void testChannelWithSegments(int numSegments) throws Exception
 	{
 		final List<MemorySegment> memory = this.memManager.allocatePages(memoryOwner, numSegments);
-		final Channel.ID channel = this.ioManager.createChannel();
+		final FileIOChannel.ID channel = this.ioManager.createChannel();
 		
 		BlockChannelWriter writer = null;
 		BlockChannelReader reader = null;
@@ -246,7 +245,7 @@ public class IOManagerPerformanceBenchmark
 	}
 		
 	private void speedTestStream(int bufferSize) throws IOException {
-		final Channel.ID tmpChannel = ioManager.createChannel();
+		final FileIOChannel.ID tmpChannel = ioManager.createChannel();
 		final IntegerRecord rec = new IntegerRecord(0);
 		
 		File tempFile = null;
@@ -342,7 +341,7 @@ public class IOManagerPerformanceBenchmark
 	@SuppressWarnings("resource")
 	private void speedTestNIO(int bufferSize, boolean direct) throws IOException
 	{
-		final Channel.ID tmpChannel = ioManager.createChannel();
+		final FileIOChannel.ID tmpChannel = ioManager.createChannel();
 		
 		File tempFile = null;
 		FileChannel fs = null;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
index 810b0e2..fc6ea37 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
@@ -26,10 +26,10 @@ import java.util.List;
 import org.junit.Assert;
 
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.BlockChannelAccess;
+import org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
 import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
-import org.apache.flink.runtime.io.disk.iomanager.Channel;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.ReadRequest;
 import org.apache.flink.runtime.io.disk.iomanager.WriteRequest;
@@ -39,8 +39,8 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-public class IOManagerTest
-{
+public class IOManagerTest {
+	
 	// ------------------------------------------------------------------------
 	//                        Cross Test Fields
 	// ------------------------------------------------------------------------
@@ -54,15 +54,13 @@ public class IOManagerTest
 	// ------------------------------------------------------------------------
 	
 	@Before
-	public void beforeTest()
-	{
+	public void beforeTest() {
 		this.memoryManager = new DefaultMemoryManager(32 * 1024 * 1024, 1);
-		this.ioManager = new IOManager();
+		this.ioManager = new IOManagerAsync();
 	}
 
 	@After
-	public void afterTest()
-	{
+	public void afterTest() {
 		this.ioManager.shutdown();
 		Assert.assertTrue("IO Manager has not properly shut down.", ioManager.isProperlyShutDown());
 		
@@ -84,10 +82,10 @@ public class IOManagerTest
 	public void channelEnumerator() {
 		File tempPath = new File(System.getProperty("java.io.tmpdir")); 
 		
-		Channel.Enumerator enumerator = ioManager.createChannelEnumerator();
+		FileIOChannel.Enumerator enumerator = ioManager.createChannelEnumerator();
 
 		for (int i = 0; i < 10; i++) {
-			Channel.ID id = enumerator.next();
+			FileIOChannel.ID id = enumerator.next();
 			
 			File path = new File(id.getPath());
 			Assert.assertTrue("Channel IDs must name an absolute path.", path.isAbsolute());
@@ -99,12 +97,11 @@ public class IOManagerTest
 	// ------------------------------------------------------------------------
 	
 	@Test
-	public void channelReadWriteOneSegment()
-	{
+	public void channelReadWriteOneSegment() {
 		final int NUM_IOS = 1111;
 		
 		try {
-			final Channel.ID channelID = this.ioManager.createChannel();
+			final FileIOChannel.ID channelID = this.ioManager.createChannel();
 			final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channelID);
 			
 			MemorySegment memSeg = this.memoryManager.allocatePages(new DummyInvokable(), 1).get(0);
@@ -143,14 +140,13 @@ public class IOManagerTest
 	}
 	
 	@Test
-	public void channelReadWriteMultipleSegments()
-	{
+	public void channelReadWriteMultipleSegments() {
 		final int NUM_IOS = 1111;
 		final int NUM_SEGS = 16;
 		
 		try {
 			final List<MemorySegment> memSegs = this.memoryManager.allocatePages(new DummyInvokable(),
NUM_SEGS);
-			final Channel.ID channelID = this.ioManager.createChannel();
+			final FileIOChannel.ID channelID = this.ioManager.createChannel();
 			final BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channelID);
 			
 			for (int i = 0; i < NUM_IOS; i++) {
@@ -202,29 +198,26 @@ public class IOManagerTest
 
 	// ============================================================================================
 	
-	final class FailingSegmentReadRequest implements ReadRequest
-	{
-		private final BlockChannelAccess<ReadRequest, ?> channel;
+	final class FailingSegmentReadRequest implements ReadRequest {
+		
+		private final AsynchronousFileIOChannel<ReadRequest> channel;
 		
 		private final MemorySegment segment;
 		
-		protected FailingSegmentReadRequest(BlockChannelAccess<ReadRequest, ?> targetChannel,
MemorySegment segment)
-		{
+		protected FailingSegmentReadRequest(AsynchronousFileIOChannel<ReadRequest> targetChannel,
MemorySegment segment) {
 			this.channel = targetChannel;
 			this.segment = segment;
 		}
 
 
 		@Override
-		public void read() throws IOException
-		{
+		public void read() throws IOException {
 			throw new TestIOException();
 		}
 
 
 		@Override
-		public void requestDone(IOException ioex)
-		{
+		public void requestDone(IOException ioex) {
 			this.channel.handleProcessedBuffer(this.segment, ioex);
 		}
 	}
@@ -234,36 +227,30 @@ public class IOManagerTest
 	/**
 	 * Special write request that writes an entire memory segment to the block writer.
 	 */
-	final class FailingSegmentWriteRequest implements WriteRequest
-	{
-		private final BlockChannelAccess<WriteRequest, ?> channel;
+	final class FailingSegmentWriteRequest implements WriteRequest {
+		
+		private final AsynchronousFileIOChannel<WriteRequest> channel;
 		
 		private final MemorySegment segment;
 		
-		protected FailingSegmentWriteRequest(BlockChannelAccess<WriteRequest, ?> targetChannel,
MemorySegment segment)
-		{
+		protected FailingSegmentWriteRequest(AsynchronousFileIOChannel<WriteRequest> targetChannel,
MemorySegment segment) {
 			this.channel = targetChannel;
 			this.segment = segment;
 		}
 
-
 		@Override
-		public void write() throws IOException
-		{
+		public void write() throws IOException {
 			throw new TestIOException();
 		}
 
-
 		@Override
-		public void requestDone(IOException ioex)
-		{
+		public void requestDone(IOException ioex) {
 			this.channel.handleProcessedBuffer(this.segment, ioex);
 		}
 	}
 	
 	
-	final class TestIOException extends IOException
-	{
+	final class TestIOException extends IOException {
 		private static final long serialVersionUID = -814705441998024472L;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
index 2f30eed..bca6896 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
@@ -16,13 +16,11 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.junit.Assert;
-
 import org.apache.flink.api.common.functions.CrossFunction;
 import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
@@ -32,8 +30,9 @@ import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.types.Record;
 import org.junit.Test;
 
-public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, Record>>
-{
+@SuppressWarnings("deprecation")
+public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, Record>>
{
+	
 	private static final long CROSS_MEM = 1024 * 1024;
 
 	private final double cross_frac;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
index 6e66e72..1e0e882 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
@@ -32,8 +32,9 @@ import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
-public class MatchTaskExternalITCase extends DriverTestBase<FlatJoinFunction<Record,
Record, Record>>
-{
+@SuppressWarnings("deprecation")
+public class MatchTaskExternalITCase extends DriverTestBase<FlatJoinFunction<Record,
Record, Record>> {
+	
 	private static final long HASH_MEM = 4*1024*1024;
 	
 	private static final long SORT_MEM = 3*1024*1024;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
index a824f77..584bc02 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
@@ -40,8 +40,9 @@ import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
 
-public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Record,
Record>>
-{
+@SuppressWarnings("deprecation")
+public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Record,
Record>> {
+	
 	private static final long HASH_MEM = 6*1024*1024;
 	
 	private static final long SORT_MEM = 3*1024*1024;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorITCase.java
index 6dfe82b..21e686d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashMatchIteratorITCase.java
@@ -36,6 +36,7 @@ import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
 import org.apache.flink.api.common.typeutils.record.RecordSerializer;
 import org.apache.flink.api.java.record.functions.JoinFunction;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -61,7 +62,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-@SuppressWarnings("serial")
+@SuppressWarnings({"serial", "deprecation"})
 public class HashMatchIteratorITCase {
 	
 	private static final int MEMORY_SIZE = 16000000;		// total memory
@@ -104,12 +105,11 @@ public class HashMatchIteratorITCase {
 		this.recordPairPairComparator = new RecordIntPairPairComparator();
 		
 		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
-		this.ioManager = new IOManager();
+		this.ioManager = new IOManagerAsync();
 	}
 
 	@After
-	public void afterTest()
-	{
+	public void afterTest() {
 		if (this.ioManager != null) {
 			this.ioManager.shutdown();
 			if (!this.ioManager.isProperlyShutDown()) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
index 921151a..2cc9892 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.typeutils.record.RecordComparator;
 import org.apache.flink.api.common.typeutils.record.RecordSerializer;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
@@ -96,7 +97,7 @@ public class HashTableITCase {
 		this.pairComparator = new IntPairPairComparator();
 		
 		this.memManager = new DefaultMemoryManager(32 * 1024 * 1024,1);
-		this.ioManager = new IOManager();
+		this.ioManager = new IOManagerAsync();
 	}
 	
 	@After
@@ -756,7 +757,7 @@ public class HashTableITCase {
 		}
 		
 		// create the I/O access for spilling
-		final IOManager ioManager = new IOManager();
+		final IOManager ioManager = new IOManagerAsync();
 		
 		// ----------------------------------------------------------------------------------------
 		
@@ -857,7 +858,7 @@ public class HashTableITCase {
 		}
 		
 		// create the I/O access for spilling
-		IOManager ioManager = new IOManager();
+		IOManager ioManager = new IOManagerAsync();
 		
 		// create the map for validating the results
 		HashMap<Integer, Long> map = new HashMap<Integer, Long>(NUM_KEYS);
@@ -972,7 +973,7 @@ public class HashTableITCase {
 		}
 		
 		// create the I/O access for spilling
-		IOManager ioManager = new IOManager();
+		IOManager ioManager = new IOManagerAsync();
 		
 		// create the map for validating the results
 		HashMap<Integer, Long> map = new HashMap<Integer, Long>(NUM_KEYS);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
index 0fbe98a..a8941a4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.operators.hash.AbstractHashTableProber;
 import org.apache.flink.runtime.operators.hash.AbstractMutableHashTable;
 import org.apache.flink.runtime.operators.hash.CompactingHashTable;
@@ -55,7 +56,7 @@ public class HashTablePerformanceComparison {
 	
 	private final TypePairComparator<IntPair, IntPair> pairComparator = new IntPairPairComparator();
 	
-	private IOManager ioManager = new IOManager();
+	private IOManager ioManager = new IOManagerAsync();
 	
 	@Test
 	public void testCompactingHashMapPerformance() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
index c1a906c..04d1a38 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
@@ -38,6 +38,7 @@ import org.apache.flink.api.common.typeutils.record.RecordSerializer;
 import org.apache.flink.api.java.record.functions.JoinFunction;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
@@ -68,7 +69,7 @@ import org.junit.Test;
  * Test specialized hash join that keeps the build side data (in memory and on hard disk)
  * This is used for iterative tasks.
  */
-
+@SuppressWarnings("deprecation")
 public class ReOpenableHashTableITCase {
 	
 	private static final int PAGE_SIZE = 8 * 1024;
@@ -120,7 +121,7 @@ public class ReOpenableHashTableITCase {
 		this.pactRecordComparator = new HashTableITCase.RecordPairComparatorFirstInt();
 		
 		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE,1, PAGE_SIZE);
-		this.ioManager = new IOManager();
+		this.ioManager = new IOManagerAsync();
 	}
 
 	@After

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
index 2bc11f1..4db520e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableIteratorTest.java
@@ -25,6 +25,7 @@ import java.util.NoSuchElementException;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntValueSerializer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -56,7 +57,7 @@ public class SpillingResettableIteratorTest {
 	public void startup() {
 		// set up IO and memory manager
 		this.memman = new DefaultMemoryManager(MEMORY_CAPACITY, 32 * 1024);
-		this.ioman = new IOManager();
+		this.ioman = new IOManagerAsync();
 
 		// create test objects
 		ArrayList<IntValue> objects = new ArrayList<IntValue>(NUM_TESTRECORDS);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
index 9eaae9a..10d3534 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/resettable/SpillingResettableMutableObjectIteratorTest.java
@@ -25,6 +25,7 @@ import org.junit.Assert;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.record.RecordSerializer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -57,7 +58,7 @@ public class SpillingResettableMutableObjectIteratorTest {
 	public void startup() {
 		// set up IO and memory manager
 		this.memman = new DefaultMemoryManager(MEMORY_CAPACITY, 32 * 1024);
-		this.ioman = new IOManager();
+		this.ioman = new IOManagerAsync();
 
 		// create test objects
 		final ArrayList<Record> objects = new ArrayList<Record>(NUM_TESTRECORDS);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
index cbceb8b..852a8f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -80,7 +81,7 @@ public class CombiningUnilateralSortMergerITCase {
 	@Before
 	public void beforeTest() {
 		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
-		this.ioManager = new IOManager();
+		this.ioManager = new IOManagerAsync();
 		
 		this.serializerFactory = RecordSerializerFactory.get();
 		this.comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
index 8e4bbc0..a340ef2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.api.common.typeutils.record.RecordComparator;
 import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -81,7 +82,7 @@ public class ExternalSortITCase {
 	@Before
 	public void beforeTest() {
 		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
-		this.ioManager = new IOManager();
+		this.ioManager = new IOManagerAsync();
 		
 		this.pactRecordSerializer = RecordSerializerFactory.get();
 		this.pactRecordComparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
index 257eb87..9dec847 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java
@@ -36,6 +36,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
@@ -85,7 +86,7 @@ public class MassiveStringSortingITCase {
 			
 			try {
 				MemoryManager mm = new DefaultMemoryManager(1024 * 1024, 1);
-				IOManager ioMan = new IOManager();
+				IOManager ioMan = new IOManagerAsync();
 					
 				TypeSerializer<String> serializer = StringSerializer.INSTANCE;
 				TypeComparator<String> comparator = new StringComparator(true);
@@ -175,7 +176,7 @@ public class MassiveStringSortingITCase {
 			
 			try {
 				MemoryManager mm = new DefaultMemoryManager(1024 * 1024, 1);
-				IOManager ioMan = new IOManager();
+				IOManager ioMan = new IOManagerAsync();
 					
 				TupleTypeInfo<Tuple2<String, String[]>> typeInfo = (TupleTypeInfo<Tuple2<String,
String[]>>) (TupleTypeInfo<?>) TypeInfoParser.parse("Tuple2<String, String[]>");
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java
index 766d66c..e12c4ac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/SortMergeMatchIteratorITCase.java
@@ -33,6 +33,7 @@ import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
 import org.apache.flink.api.common.typeutils.record.RecordSerializer;
 import org.apache.flink.api.java.record.functions.JoinFunction;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -53,7 +54,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-
+@SuppressWarnings("deprecation")
 public class SortMergeMatchIteratorITCase {
 	
 	// total memory
@@ -85,8 +86,7 @@ public class SortMergeMatchIteratorITCase {
 
 	@SuppressWarnings("unchecked")
 	@Before
-	public void beforeTest()
-	{
+	public void beforeTest() {
 		this.serializer1 = RecordSerializer.get();
 		this.serializer2 = RecordSerializer.get();
 		this.comparator1 = new RecordComparator(new int[] {0}, new Class[]{TestData.Key.class});
@@ -94,12 +94,11 @@ public class SortMergeMatchIteratorITCase {
 		this.pairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[]{TestData.Key.class});
 		
 		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, 1);
-		this.ioManager = new IOManager();
+		this.ioManager = new IOManagerAsync();
 	}
 
 	@After
-	public void afterTest()
-	{
+	public void afterTest() {
 		if (this.ioManager != null) {
 			this.ioManager.shutdown();
 			if (!this.ioManager.isProperlyShutDown()) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index 4d04bf4..02206f6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.typeutils.record.RecordComparator;
 import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -92,7 +93,7 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S,
Re
 		
 		this.perSortMem = perSortMemory;
 		this.perSortFractionMem = (double)perSortMemory/totalMem;
-		this.ioManager = new IOManager();
+		this.ioManager = new IOManagerAsync();
 		this.memManager = totalMem > 0 ? new DefaultMemoryManager(totalMem,1) : null;
 		
 		this.inputs = new ArrayList<MutableObjectIterator<Record>>();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 5e0edc4..60a81c1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -33,6 +33,7 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.Buffer;
 import org.apache.flink.runtime.io.network.bufferprovider.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
@@ -86,7 +87,7 @@ public class MockEnvironment implements Environment, BufferProvider, LocalBuffer
 		this.outputs = new LinkedList<OutputGate>();
 
 		this.memManager = new DefaultMemoryManager(memorySize, 1);
-		this.ioManager = new IOManager(System.getProperty("java.io.tmpdir"));
+		this.ioManager = new IOManagerAsync();
 		this.inputSplitProvider = inputSplitProvider;
 		this.mockBuffer = new Buffer(new MemorySegment(new byte[bufferSize]), bufferSize, null);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c9cfe3ba/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
index 744651d..ba38776 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
 import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
 import org.apache.flink.api.java.record.functions.JoinFunction;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
@@ -47,7 +48,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-
+@SuppressWarnings("deprecation")
 public class HashVsSortMiniBenchmark {
 	
 	// total memory
@@ -94,7 +95,7 @@ public class HashVsSortMiniBenchmark {
 		this.pairComparator11 = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[]
{TestData.Key.class});
 		
 		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE, PAGE_SIZE);
-		this.ioManager = new IOManager();
+		this.ioManager = new IOManagerAsync();
 	}
 
 	@After


Mime
View raw message