flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject flink git commit: [FLINK-1483] IOManager puts temp files in dedicated directory and removes that on shutdown
Date Thu, 19 Feb 2015 10:54:20 GMT
Repository: flink
Updated Branches:
  refs/heads/release-0.8 af50e5619 -> 8038d227d


[FLINK-1483] IOManager puts temp files in dedicated directory and removes that on shutdown


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8038d227
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8038d227
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8038d227

Branch: refs/heads/release-0.8
Commit: 8038d227d53d857cfc2b1af701e719d71ba7f091
Parents: af50e56
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Feb 18 15:03:25 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Feb 19 10:53:13 2015 +0100

----------------------------------------------------------------------
 .../io/disk/iomanager/FileIOChannel.java        |  34 ++--
 .../runtime/io/disk/iomanager/IOManager.java    | 156 ++++++++++++++-----
 .../io/disk/iomanager/IOManagerAsync.java       |  58 +++----
 .../io/disk/iomanager/IOManagerTest.java        | 110 +++++++++----
 4 files changed, 251 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8038d227/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
index 7c9d31b..d6c1ce0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
@@ -77,24 +77,33 @@ public interface FileIOChannel {
 		
 		private static final int RANDOM_BYTES_LENGTH = 16;
 		
-		private final String path;
+		private final File path;
 		
 		private final int threadNum;
 
-		protected ID(String path, int threadNum) {
+		protected ID(File path, int threadNum) {
 			this.path = path;
 			this.threadNum = threadNum;
 		}
 
-		protected ID(String basePath, int threadNum, Random random) {
-			this.path = basePath + File.separator + randomString(random) + ".channel";
+		protected ID(File basePath, int threadNum, Random random) {
+			this.path = new File(basePath, randomString(random) + ".channel");
 			this.threadNum = threadNum;
 		}
 
 		/**
 		 * Returns the path to the underlying temporary file.
+		 * @return The path to the underlying temporary file..
 		 */
 		public String getPath() {
+			return path.getAbsolutePath();
+		}
+
+		/**
+		 * Returns the path to the underlying temporary file as a File.
+		 * @return The path to the underlying temporary file as a File.
+		 */
+		public File getPathFile() {
 			return path;
 		}
 		
@@ -119,11 +128,11 @@ public interface FileIOChannel {
 		
 		@Override
 		public String toString() {
-			return path;
+			return path.getAbsolutePath();
 		}
 		
-		private static final String randomString(final Random random) {
-			final byte[] bytes = new byte[RANDOM_BYTES_LENGTH];
+		private static String randomString(Random random) {
+			byte[] bytes = new byte[RANDOM_BYTES_LENGTH];
 			random.nextBytes(bytes);
 			return StringUtils.byteToHexString(bytes);
 		}
@@ -133,24 +142,23 @@ public interface FileIOChannel {
 	 * An enumerator for channels that logically belong together.
 	 */
 	public static final class Enumerator {
-		
-		private static final String FORMAT = "%s%s%s.%06d.channel";
 
-		private final String[] paths;
+		private final File[] paths;
 		
 		private final String namePrefix;
 
 		private int counter;
 
-		protected Enumerator(String[] basePaths, Random random) {
+		protected Enumerator(File[] basePaths, Random random) {
 			this.paths = basePaths;
 			this.namePrefix = ID.randomString(random);
 			this.counter = 0;
 		}
 
 		public ID next() {
-			final int threadNum = counter % paths.length;
-			return new ID(String.format(FORMAT, this.paths[threadNum], File.separator, namePrefix,
(counter++)), threadNum);
+			int threadNum = counter % paths.length;
+			String filename = String.format(" %s.%06d.channel", namePrefix, (counter++));
+			return new ID(new File(paths[threadNum], filename), threadNum);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8038d227/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index f77d9c4..49f697e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -18,68 +18,134 @@
 
 package org.apache.flink.runtime.io.disk.iomanager;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.core.memory.MemorySegment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import java.util.Random;
+import java.util.UUID;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.core.memory.MemorySegment;
-
 /**
  * The facade for the provided I/O manager services.
  */
 public abstract class IOManager {
-	
+
 	/** Logging */
 	protected static final Logger LOG = LoggerFactory.getLogger(IOManager.class);
 
 	/** The temporary directories for files */
-	private final String[] paths;
+	private final File[] paths;
 
 	/** A random number generator for the anonymous ChannelIDs. */
 	private final Random random;
-	
+
 	/** The number of the next path to use. */
 	private volatile int nextPath;
-	
+
+	/** Shutdown hook to make sure that the directories are removed on exit */
+	private final Thread shutdownHook;
+
 	// -------------------------------------------------------------------------
 	//               Constructors / Destructors
 	// -------------------------------------------------------------------------
 
 	/**
 	 * Constructs a new IOManager.
-	 * 
-	 * @param paths
-	 *        the basic directory paths for files underlying anonymous channels.
+	 *
+	 * @param tempDirs The basic directories for files underlying anonymous channels.
 	 */
-	protected IOManager(String[] paths) {
-		this.paths = paths;
+	protected IOManager(String[] tempDirs) {
+		if (tempDirs == null || tempDirs.length == 0) {
+			throw new IllegalArgumentException("The temporary directories must not be null or empty.");
+		}
+
 		this.random = new Random();
 		this.nextPath = 0;
+
+		this.paths = new File[tempDirs.length];
+		for (int i = 0; i < tempDirs.length; i++) {
+			File baseDir = new File(tempDirs[i]);
+			String subfolder = String.format("flink-io-%s", UUID.randomUUID().toString());
+			File storageDir = new File(baseDir, subfolder);
+
+			if (!storageDir.exists() && !storageDir.mkdirs()) {
+				throw new RuntimeException(
+						"Could not create storage directory for IOManager: " + storageDir.getAbsolutePath());
+			}
+			paths[i] = storageDir;
+			LOG.info("I/O manager uses directory {} for spill files.", storageDir.getAbsolutePath());
+		}
+
+		this.shutdownHook = new Thread("I/O manager shutdown hook") {
+			@Override
+			public void run() {
+				shutdown();
+			}
+		};
+		Runtime.getRuntime().addShutdownHook(this.shutdownHook);
 	}
 
 	/**
-	 * Close method, marks the I/O manager as closed.
+	 * Close method, marks the I/O manager as closed
+	 * and removed all temporary files.
 	 */
-	public abstract void shutdown();
-	
+	public void shutdown() {
+		// remove all of our temp directories
+		for (File path : paths) {
+			try {
+				if (path != null) {
+					if (path.exists()) {
+						FileUtils.deleteDirectory(path);
+						LOG.info("I/O manager removed spill file directory {}", path.getAbsolutePath());
+					}
+				}
+			} catch (Throwable t) {
+				LOG.error("IOManager failed to properly clean up temp file directory: " + path, t);
+			}
+		}
+
+		// Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown
hook itself
+		if (shutdownHook != Thread.currentThread()) {
+			try {
+				Runtime.getRuntime().removeShutdownHook(shutdownHook);
+			}
+			catch (IllegalStateException e) {
+				// race, JVM is in shutdown already, we can safely ignore this
+			}
+			catch (Throwable t) {
+				LOG.warn("Exception while unregistering IOManager's shutdown hook.", t);
+			}
+		}
+	}
+
 	/**
 	 * Utility method to check whether the IO manager has been properly shut down.
-	 * 
+	 * For this base implementation, this means that all files have been removed.
+	 *
 	 * @return True, if the IO manager has properly shut down, false otherwise.
 	 */
-	public abstract boolean isProperlyShutDown();
+	public boolean isProperlyShutDown() {
+		for (File path : paths) {
+			if (path != null && path.exists()) {
+				return false;
+			}
+		}
+		return true;
+	}
 
 	// ------------------------------------------------------------------------
 	//                          Channel Instantiations
 	// ------------------------------------------------------------------------
-	
+
 	/**
 	 * Creates a new {@link FileIOChannel.ID} in one of the temp directories. Multiple
 	 * invocations of this method spread the channels evenly across the different directories.
-	 * 
+	 *
 	 * @return A channel to a temporary directory.
 	 */
 	public FileIOChannel.ID createChannel() {
@@ -90,22 +156,36 @@ public abstract class IOManager {
 	/**
 	 * Creates a new {@link FileIOChannel.Enumerator}, spreading the channels in a round-robin
fashion
 	 * across the temporary file directories.
-	 * 
+	 *
 	 * @return An enumerator for channels.
 	 */
 	public FileIOChannel.Enumerator createChannelEnumerator() {
 		return new FileIOChannel.Enumerator(this.paths, this.random);
 	}
 
+	/**
+	 * Deletes the file underlying the given channel. If the channel is still open, this
+	 * call may fail.
+	 * 
+	 * @param channel The channel to be deleted.
+	 * @throws IOException Thrown if the deletion fails.
+	 */
+	public void deleteChannel(FileIOChannel.ID channel) throws IOException {
+		if (channel != null) {
+			if (channel.getPathFile().exists() && !channel.getPathFile().delete()) {
+				LOG.warn("IOManager failed to delete temporary file {}", channel.getPath());
+			}
+		}
+	}
 	
 	// ------------------------------------------------------------------------
 	//                        Reader / Writer instantiations
 	// ------------------------------------------------------------------------
-	
+
 	/**
 	 * Creates a block channel writer that writes to the given channel. The writer adds the
 	 * written segment to its return-queue afterwards (to allow for asynchronous implementations).
-	 * 
+	 *
 	 * @param channelID The descriptor for the channel to write to.
 	 * @return A block channel writer that writes to the given channel.
 	 * @throws IOException Thrown, if the channel for the writer could not be opened.
@@ -113,11 +193,11 @@ public abstract class IOManager {
 	public BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID channelID) throws IOException
{
 		return createBlockChannelWriter(channelID, new LinkedBlockingQueue<MemorySegment>());
 	}
-	
+
 	/**
 	 * Creates a block channel writer that writes to the given channel. The writer adds the
 	 * written segment to the given queue (to allow for asynchronous implementations).
-	 * 
+	 *
 	 * @param channelID The descriptor for the channel to write to.
 	 * @param returnQueue The queue to put the written buffers into.
 	 * @return A block channel writer that writes to the given channel.
@@ -125,24 +205,24 @@ public abstract class IOManager {
 	 */
 	public abstract BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID channelID,
 				LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException;
-	
+
 	/**
 	 * Creates a block channel writer that writes to the given channel. The writer calls the
given callback
 	 * after the I/O operation has been performed (successfully or unsuccessfully), to allow
 	 * for asynchronous implementations.
-	 * 
+	 *
 	 * @param channelID The descriptor for the channel to write to.
-	 * @param callback The callback to be called for 
+	 * @param callback The callback to be called for
 	 * @return A block channel writer that writes to the given channel.
 	 * @throws IOException Thrown, if the channel for the writer could not be opened.
 	 */
 	public abstract BlockChannelWriterWithCallback createBlockChannelWriter(FileIOChannel.ID
channelID, RequestDoneCallback callback) throws IOException;
-	
+
 	/**
 	 * Creates a block channel reader that reads blocks from the given channel. The reader pushed
 	 * full memory segments (with the read data) to its "return queue", to allow for asynchronous
read
 	 * implementations.
-	 * 
+	 *
 	 * @param channelID The descriptor for the channel to write to.
 	 * @return A block channel reader that reads from the given channel.
 	 * @throws IOException Thrown, if the channel for the reader could not be opened.
@@ -150,35 +230,35 @@ public abstract class IOManager {
 	public BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID) throws IOException
{
 		return createBlockChannelReader(channelID, new LinkedBlockingQueue<MemorySegment>());
 	}
-	
+
 	/**
 	 * Creates a block channel reader that reads blocks from the given channel. The reader pushes
the full segments
 	 * to the given queue, to allow for asynchronous implementations.
-	 * 
+	 *
 	 * @param channelID The descriptor for the channel to write to.
 	 * @param returnQueue The queue to put the full buffers into.
 	 * @return A block channel reader that reads from the given channel.
 	 * @throws IOException Thrown, if the channel for the reader could not be opened.
 	 */
-	public abstract BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID,

+	public abstract BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID,
 										LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException;
-	
+
 	/**
 	 * Creates a block channel reader that reads all blocks from the given channel directly
in one bulk.
 	 * The reader draws segments to read the blocks into from a supplied list, which must contain
as many
-	 * segments as the channel has blocks. After the reader is done, the list with the full
segments can be 
+	 * segments as the channel has blocks. After the reader is done, the list with the full
segments can be
 	 * obtained from the reader.
 	 * <p>
-	 * If a channel is not to be read in one bulk, but in multiple smaller batches, a  
+	 * If a channel is not to be read in one bulk, but in multiple smaller batches, a
 	 * {@link BlockChannelReader} should be used.
-	 * 
+	 *
 	 * @param channelID The descriptor for the channel to write to.
 	 * @param targetSegments The list to take the segments from into which to read the data.
 	 * @param numBlocks The number of blocks in the channel to read.
 	 * @return A block channel reader that reads from the given channel.
 	 * @throws IOException Thrown, if the channel for the reader could not be opened.
 	 */
-	public abstract BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID,

+	public abstract BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID,
 			List<MemorySegment> targetSegments, int numBlocks) throws IOException;
 	
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/8038d227/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
index 1f79067..ca16ddb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
@@ -18,15 +18,16 @@
 
 package org.apache.flink.runtime.io.disk.iomanager;
 
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+
 import java.io.IOException;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-
-import com.google.common.base.Preconditions;
+import static com.google.common.base.Preconditions.checkState;
 
 /**
  * A version of the {@link IOManager} that uses asynchronous I/O.
@@ -38,12 +39,9 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 
 	/** The reader threads used for asynchronous block oriented channel reading. */
 	private final ReaderThread[] readers;
-	
-	/** Lock object to guard shutdown */
-	private final Object shutdownLock = new Object();
-	
-	/** Flag to mark the I/O manager as alive or shut down */
-	private volatile boolean shutdown;
+
+	/** Flag to signify that the IOManager has been shut down already */
+	private final AtomicBoolean isShutdown = new AtomicBoolean();
 	
 	// -------------------------------------------------------------------------
 	//               Constructors / Destructors
@@ -103,13 +101,12 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 	 */
 	@Override
 	public void shutdown() {
-		synchronized (shutdownLock) {
-			if (shutdown) {
-				return;
-			}
-			
-			shutdown = true;
-			
+		// mark shut down and exit if it already was shut down
+		if (!isShutdown.compareAndSet(false, true)) {
+			return;
+		}
+
+		try {
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("Shutting down I/O manager.");
 			}
@@ -141,7 +138,14 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 					rt.join();
 				}
 			}
-			catch (InterruptedException iex) {}
+			catch (InterruptedException iex) {
+				// ignore this on shutdown
+			}
+		}
+		finally {
+			// make sure we all the super implementation in any case and at the last point,
+			// because this will clean up the I/O directories
+			super.shutdown();
 		}
 	}
 	
@@ -160,17 +164,17 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 		
 		boolean writersShutDown = true;
 		for (WriterThread wt : writers) {
-			readersShutDown &= wt.getState() == Thread.State.TERMINATED;
+			writersShutDown &= wt.getState() == Thread.State.TERMINATED;
 		}
 		
-		return shutdown && writersShutDown && readersShutDown;
+		return isShutdown.get() && readersShutDown && writersShutDown &&
super.isProperlyShutDown();
 	}
 
 
 	@Override
 	public void uncaughtException(Thread t, Throwable e) {
-		LOG.error("IO Thread '" + t.getName() + "' terminated due to an exception. Closing I/O
Manager.", e);
-		shutdown();	
+		LOG.error("IO Thread '" + t.getName() + "' terminated due to an exception. Shutting down
I/O Manager.", e);
+		shutdown();
 	}
 	
 	// ------------------------------------------------------------------------
@@ -181,13 +185,13 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 	public BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID channelID,
 								LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException
 	{
-		Preconditions.checkState(!shutdown, "I/O-Manger is closed.");
+		checkState(!isShutdown.get(), "I/O-Manger is shut down.");
 		return new AsynchronousBlockWriter(channelID, this.writers[channelID.getThreadNum()].requestQueue,
returnQueue);
 	}
 	
 	@Override
 	public BlockChannelWriterWithCallback createBlockChannelWriter(FileIOChannel.ID channelID,
RequestDoneCallback callback) throws IOException {
-		Preconditions.checkState(!shutdown, "I/O-Manger is closed.");
+		checkState(!isShutdown.get(), "I/O-Manger is shut down.");
 		return new AsynchronousBlockWriterWithCallback(channelID, this.writers[channelID.getThreadNum()].requestQueue,
callback);
 	}
 	
@@ -205,7 +209,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 	public BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID,
 										LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException
 	{
-		Preconditions.checkState(!shutdown, "I/O-Manger is closed.");
+		checkState(!isShutdown.get(), "I/O-Manger is shut down.");
 		return new AsynchronousBlockReader(channelID, this.readers[channelID.getThreadNum()].requestQueue,
returnQueue);
 	}
 	
@@ -228,7 +232,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 	public BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID,
 			List<MemorySegment> targetSegments,	int numBlocks) throws IOException
 	{
-		Preconditions.checkState(!shutdown, "I/O-Manger is closed.");
+		checkState(!isShutdown.get(), "I/O-Manger is shut down.");
 		return new AsynchronousBulkBlockReader(channelID, this.readers[channelID.getThreadNum()].requestQueue,
targetSegments, numBlocks);
 	}
 
@@ -446,4 +450,4 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 		}
 		
 	}; // end writer thread
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/8038d227/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
index fc6ea37..55a53d1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
@@ -16,29 +16,22 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.disk.iomanager;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
-
-import org.junit.Assert;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel;
-import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
-import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
-import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.ReadRequest;
-import org.apache.flink.runtime.io.disk.iomanager.WriteRequest;
 import org.apache.flink.runtime.memory.DefaultMemoryManagerTest.DummyInvokable;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.junit.Assert.*;
+
 public class IOManagerTest {
 	
 	// ------------------------------------------------------------------------
@@ -62,9 +55,9 @@ public class IOManagerTest {
 	@After
 	public void afterTest() {
 		this.ioManager.shutdown();
-		Assert.assertTrue("IO Manager has not properly shut down.", ioManager.isProperlyShutDown());
+		assertTrue("IO Manager has not properly shut down.", ioManager.isProperlyShutDown());
 		
-		Assert.assertTrue("Not all memory was returned to the memory manager in the test.", this.memoryManager.verifyEmpty());
+		assertTrue("Not all memory was returned to the memory manager in the test.", this.memoryManager.verifyEmpty());
 		this.memoryManager.shutdown();
 		this.memoryManager = null;
 	}
@@ -75,22 +68,52 @@ public class IOManagerTest {
 	
 	// ------------------------------------------------------------------------
 
-	/**
-	 * Tests that the channel enumerator creates channels in the temporary files directory.
-	 */
 	@Test
 	public void channelEnumerator() {
-		File tempPath = new File(System.getProperty("java.io.tmpdir")); 
-		
-		FileIOChannel.Enumerator enumerator = ioManager.createChannelEnumerator();
+		IOManager ioMan = null;
 
-		for (int i = 0; i < 10; i++) {
-			FileIOChannel.ID id = enumerator.next();
-			
-			File path = new File(id.getPath());
-			Assert.assertTrue("Channel IDs must name an absolute path.", path.isAbsolute());
-			Assert.assertFalse("Channel IDs must name a file, not a directory.", path.isDirectory());
-			Assert.assertTrue("Path is not in the temp directory.", tempPath.equals(path.getParentFile()));
+		try {
+			File tempPath = new File(System.getProperty("java.io.tmpdir"));
+
+			String[] tempDirs = new String[]{
+					new File(tempPath, "a").getAbsolutePath(),
+					new File(tempPath, "b").getAbsolutePath(),
+					new File(tempPath, "c").getAbsolutePath(),
+					new File(tempPath, "d").getAbsolutePath(),
+					new File(tempPath, "e").getAbsolutePath(),
+			};
+
+			int[] counters = new int[tempDirs.length];
+
+			ioMan = new TestIOManager(tempDirs);
+			FileIOChannel.Enumerator enumerator = ioMan.createChannelEnumerator();
+
+			for (int i = 0; i < 3 * tempDirs.length; i++) {
+				FileIOChannel.ID id = enumerator.next();
+
+				File path = id.getPathFile();
+
+				assertTrue("Channel IDs must name an absolute path.", path.isAbsolute());
+				assertFalse("Channel IDs must name a file, not a directory.", path.isDirectory());
+
+				assertTrue("Path is not in the temp directory.",
+						tempPath.equals(path.getParentFile().getParentFile().getParentFile()));
+
+				for (int k = 0; k < tempDirs.length; k++) {
+					if (path.getParentFile().getParent().equals(tempDirs[k])) {
+						counters[k]++;
+					}
+				}
+			}
+
+			for (int k = 0; k < tempDirs.length; k++) {
+				assertEquals(3, counters[k]);
+			}
+		}
+		finally {
+			if (ioMan != null) {
+				ioMan.shutdown();
+			}
 		}
 	}
 
@@ -124,7 +147,7 @@ public class IOManagerTest {
 				
 				for (int pos = 0; pos < memSeg.size(); pos += 4) {
 					if (memSeg.getInt(pos) != i) {
-						Assert.fail("Read memory segment contains invalid data.");
+						fail("Read memory segment contains invalid data.");
 					}
 				}
 			}
@@ -135,7 +158,7 @@ public class IOManagerTest {
 			
 		} catch (Exception ex) {
 			ex.printStackTrace();
-			Assert.fail("TEst encountered an exception: " + ex.getMessage());
+			fail("TEst encountered an exception: " + ex.getMessage());
 		}
 	}
 	
@@ -175,7 +198,7 @@ public class IOManagerTest {
 				
 				for (int pos = 0; pos < memSeg.size(); pos += 4) {
 					if (memSeg.getInt(pos) != i) {
-						Assert.fail("Read memory segment contains invalid data.");
+						fail("Read memory segment contains invalid data.");
 					}
 				}
 				reader.readBlock(memSeg);
@@ -192,7 +215,7 @@ public class IOManagerTest {
 			
 		} catch (Exception ex) {
 			ex.printStackTrace();
-			Assert.fail("TEst encountered an exception: " + ex.getMessage());
+			fail("TEst encountered an exception: " + ex.getMessage());
 		}
 	}
 
@@ -253,4 +276,33 @@ public class IOManagerTest {
 	final class TestIOException extends IOException {
 		private static final long serialVersionUID = -814705441998024472L;
 	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	private static class TestIOManager extends IOManager {
+
+		protected TestIOManager(String[] paths) {
+			super(paths);
+		}
+
+		@Override
+		public BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID channelID, LinkedBlockingQueue<MemorySegment>
returnQueue) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public BlockChannelWriterWithCallback createBlockChannelWriter(FileIOChannel.ID channelID,
RequestDoneCallback callback) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID, LinkedBlockingQueue<MemorySegment>
returnQueue) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID,
List<MemorySegment> targetSegments, int numBlocks) throws IOException {
+			throw new UnsupportedOperationException();
+		}
+	}
 }


Mime
View raw message