Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 33013105AF for ; Thu, 19 Feb 2015 10:54:21 +0000 (UTC) Received: (qmail 40506 invoked by uid 500); 19 Feb 2015 10:54:21 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 40476 invoked by uid 500); 19 Feb 2015 10:54:21 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 40467 invoked by uid 99); 19 Feb 2015 10:54:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Feb 2015 10:54:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DD416E042E; Thu, 19 Feb 2015 10:54:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-1483] IOManager puts temp files in dedicated directory and removes that on shutdown Date: Thu, 19 Feb 2015 10:54:20 +0000 (UTC) Repository: flink Updated Branches: refs/heads/release-0.8 af50e5619 -> 8038d227d [FLINK-1483] IOManager puts temp files in dedicated directory and removes that on shutdown Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8038d227 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8038d227 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8038d227 Branch: refs/heads/release-0.8 Commit: 8038d227d53d857cfc2b1af701e719d71ba7f091 Parents: af50e56 Author: Stephan Ewen Authored: Wed Feb 18 15:03:25 2015 +0100 Committer: Stephan Ewen Committed: Thu Feb 19 10:53:13 2015 +0100 ---------------------------------------------------------------------- .../io/disk/iomanager/FileIOChannel.java | 34 ++-- .../runtime/io/disk/iomanager/IOManager.java | 156 ++++++++++++++----- .../io/disk/iomanager/IOManagerAsync.java | 58 +++---- .../io/disk/iomanager/IOManagerTest.java | 110 +++++++++---- 4 files changed, 251 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8038d227/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java index 7c9d31b..d6c1ce0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java @@ -77,24 +77,33 @@ public interface FileIOChannel { private static final int RANDOM_BYTES_LENGTH = 16; - private final String path; + private final File path; private final int threadNum; - protected ID(String path, int threadNum) { + protected ID(File path, int threadNum) { this.path = path; this.threadNum = threadNum; } - protected ID(String basePath, int threadNum, Random random) { - this.path = basePath + File.separator + randomString(random) + ".channel"; + protected ID(File basePath, int threadNum, Random random) { + this.path = new File(basePath, randomString(random) + ".channel"); this.threadNum = threadNum; } /** * Returns the path to the underlying temporary file. + * @return The path to the underlying temporary file.. */ public String getPath() { + return path.getAbsolutePath(); + } + + /** + * Returns the path to the underlying temporary file as a File. + * @return The path to the underlying temporary file as a File. + */ + public File getPathFile() { return path; } @@ -119,11 +128,11 @@ public interface FileIOChannel { @Override public String toString() { - return path; + return path.getAbsolutePath(); } - private static final String randomString(final Random random) { - final byte[] bytes = new byte[RANDOM_BYTES_LENGTH]; + private static String randomString(Random random) { + byte[] bytes = new byte[RANDOM_BYTES_LENGTH]; random.nextBytes(bytes); return StringUtils.byteToHexString(bytes); } @@ -133,24 +142,23 @@ public interface FileIOChannel { * An enumerator for channels that logically belong together. */ public static final class Enumerator { - - private static final String FORMAT = "%s%s%s.%06d.channel"; - private final String[] paths; + private final File[] paths; private final String namePrefix; private int counter; - protected Enumerator(String[] basePaths, Random random) { + protected Enumerator(File[] basePaths, Random random) { this.paths = basePaths; this.namePrefix = ID.randomString(random); this.counter = 0; } public ID next() { - final int threadNum = counter % paths.length; - return new ID(String.format(FORMAT, this.paths[threadNum], File.separator, namePrefix, (counter++)), threadNum); + int threadNum = counter % paths.length; + String filename = String.format(" %s.%06d.channel", namePrefix, (counter++)); + return new ID(new File(paths[threadNum], filename), threadNum); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/8038d227/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java index f77d9c4..49f697e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java @@ -18,68 +18,134 @@ package org.apache.flink.runtime.io.disk.iomanager; +import org.apache.commons.io.FileUtils; +import org.apache.flink.core.memory.MemorySegment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; import java.io.IOException; import java.util.List; import java.util.Random; +import java.util.UUID; import java.util.concurrent.LinkedBlockingQueue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.flink.core.memory.MemorySegment; - /** * The facade for the provided I/O manager services. */ public abstract class IOManager { - + /** Logging */ protected static final Logger LOG = LoggerFactory.getLogger(IOManager.class); /** The temporary directories for files */ - private final String[] paths; + private final File[] paths; /** A random number generator for the anonymous ChannelIDs. */ private final Random random; - + /** The number of the next path to use. */ private volatile int nextPath; - + + /** Shutdown hook to make sure that the directories are removed on exit */ + private final Thread shutdownHook; + // ------------------------------------------------------------------------- // Constructors / Destructors // ------------------------------------------------------------------------- /** * Constructs a new IOManager. - * - * @param paths - * the basic directory paths for files underlying anonymous channels. + * + * @param tempDirs The basic directories for files underlying anonymous channels. */ - protected IOManager(String[] paths) { - this.paths = paths; + protected IOManager(String[] tempDirs) { + if (tempDirs == null || tempDirs.length == 0) { + throw new IllegalArgumentException("The temporary directories must not be null or empty."); + } + this.random = new Random(); this.nextPath = 0; + + this.paths = new File[tempDirs.length]; + for (int i = 0; i < tempDirs.length; i++) { + File baseDir = new File(tempDirs[i]); + String subfolder = String.format("flink-io-%s", UUID.randomUUID().toString()); + File storageDir = new File(baseDir, subfolder); + + if (!storageDir.exists() && !storageDir.mkdirs()) { + throw new RuntimeException( + "Could not create storage directory for IOManager: " + storageDir.getAbsolutePath()); + } + paths[i] = storageDir; + LOG.info("I/O manager uses directory {} for spill files.", storageDir.getAbsolutePath()); + } + + this.shutdownHook = new Thread("I/O manager shutdown hook") { + @Override + public void run() { + shutdown(); + } + }; + Runtime.getRuntime().addShutdownHook(this.shutdownHook); } /** - * Close method, marks the I/O manager as closed. + * Close method, marks the I/O manager as closed + * and removed all temporary files. */ - public abstract void shutdown(); - + public void shutdown() { + // remove all of our temp directories + for (File path : paths) { + try { + if (path != null) { + if (path.exists()) { + FileUtils.deleteDirectory(path); + LOG.info("I/O manager removed spill file directory {}", path.getAbsolutePath()); + } + } + } catch (Throwable t) { + LOG.error("IOManager failed to properly clean up temp file directory: " + path, t); + } + } + + // Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown hook itself + if (shutdownHook != Thread.currentThread()) { + try { + Runtime.getRuntime().removeShutdownHook(shutdownHook); + } + catch (IllegalStateException e) { + // race, JVM is in shutdown already, we can safely ignore this + } + catch (Throwable t) { + LOG.warn("Exception while unregistering IOManager's shutdown hook.", t); + } + } + } + /** * Utility method to check whether the IO manager has been properly shut down. - * + * For this base implementation, this means that all files have been removed. + * * @return True, if the IO manager has properly shut down, false otherwise. */ - public abstract boolean isProperlyShutDown(); + public boolean isProperlyShutDown() { + for (File path : paths) { + if (path != null && path.exists()) { + return false; + } + } + return true; + } // ------------------------------------------------------------------------ // Channel Instantiations // ------------------------------------------------------------------------ - + /** * Creates a new {@link FileIOChannel.ID} in one of the temp directories. Multiple * invocations of this method spread the channels evenly across the different directories. - * + * * @return A channel to a temporary directory. */ public FileIOChannel.ID createChannel() { @@ -90,22 +156,36 @@ public abstract class IOManager { /** * Creates a new {@link FileIOChannel.Enumerator}, spreading the channels in a round-robin fashion * across the temporary file directories. - * + * * @return An enumerator for channels. */ public FileIOChannel.Enumerator createChannelEnumerator() { return new FileIOChannel.Enumerator(this.paths, this.random); } + /** + * Deletes the file underlying the given channel. If the channel is still open, this + * call may fail. + * + * @param channel The channel to be deleted. + * @throws IOException Thrown if the deletion fails. + */ + public void deleteChannel(FileIOChannel.ID channel) throws IOException { + if (channel != null) { + if (channel.getPathFile().exists() && !channel.getPathFile().delete()) { + LOG.warn("IOManager failed to delete temporary file {}", channel.getPath()); + } + } + } // ------------------------------------------------------------------------ // Reader / Writer instantiations // ------------------------------------------------------------------------ - + /** * Creates a block channel writer that writes to the given channel. The writer adds the * written segment to its return-queue afterwards (to allow for asynchronous implementations). - * + * * @param channelID The descriptor for the channel to write to. * @return A block channel writer that writes to the given channel. * @throws IOException Thrown, if the channel for the writer could not be opened. @@ -113,11 +193,11 @@ public abstract class IOManager { public BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID channelID) throws IOException { return createBlockChannelWriter(channelID, new LinkedBlockingQueue()); } - + /** * Creates a block channel writer that writes to the given channel. The writer adds the * written segment to the given queue (to allow for asynchronous implementations). - * + * * @param channelID The descriptor for the channel to write to. * @param returnQueue The queue to put the written buffers into. * @return A block channel writer that writes to the given channel. @@ -125,24 +205,24 @@ public abstract class IOManager { */ public abstract BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID channelID, LinkedBlockingQueue returnQueue) throws IOException; - + /** * Creates a block channel writer that writes to the given channel. The writer calls the given callback * after the I/O operation has been performed (successfully or unsuccessfully), to allow * for asynchronous implementations. - * + * * @param channelID The descriptor for the channel to write to. - * @param callback The callback to be called for + * @param callback The callback to be called for * @return A block channel writer that writes to the given channel. * @throws IOException Thrown, if the channel for the writer could not be opened. */ public abstract BlockChannelWriterWithCallback createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback callback) throws IOException; - + /** * Creates a block channel reader that reads blocks from the given channel. The reader pushed * full memory segments (with the read data) to its "return queue", to allow for asynchronous read * implementations. - * + * * @param channelID The descriptor for the channel to write to. * @return A block channel reader that reads from the given channel. * @throws IOException Thrown, if the channel for the reader could not be opened. @@ -150,35 +230,35 @@ public abstract class IOManager { public BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID) throws IOException { return createBlockChannelReader(channelID, new LinkedBlockingQueue()); } - + /** * Creates a block channel reader that reads blocks from the given channel. The reader pushes the full segments * to the given queue, to allow for asynchronous implementations. - * + * * @param channelID The descriptor for the channel to write to. * @param returnQueue The queue to put the full buffers into. * @return A block channel reader that reads from the given channel. * @throws IOException Thrown, if the channel for the reader could not be opened. */ - public abstract BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID, + public abstract BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID, LinkedBlockingQueue returnQueue) throws IOException; - + /** * Creates a block channel reader that reads all blocks from the given channel directly in one bulk. * The reader draws segments to read the blocks into from a supplied list, which must contain as many - * segments as the channel has blocks. After the reader is done, the list with the full segments can be + * segments as the channel has blocks. After the reader is done, the list with the full segments can be * obtained from the reader. *

- * If a channel is not to be read in one bulk, but in multiple smaller batches, a + * If a channel is not to be read in one bulk, but in multiple smaller batches, a * {@link BlockChannelReader} should be used. - * + * * @param channelID The descriptor for the channel to write to. * @param targetSegments The list to take the segments from into which to read the data. * @param numBlocks The number of blocks in the channel to read. * @return A block channel reader that reads from the given channel. * @throws IOException Thrown, if the channel for the reader could not be opened. */ - public abstract BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID, + public abstract BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID, List targetSegments, int numBlocks) throws IOException; // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/8038d227/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java index 1f79067..ca16ddb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java @@ -18,15 +18,16 @@ package org.apache.flink.runtime.io.disk.iomanager; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.util.EnvironmentInformation; + import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.runtime.util.EnvironmentInformation; - -import com.google.common.base.Preconditions; +import static com.google.common.base.Preconditions.checkState; /** * A version of the {@link IOManager} that uses asynchronous I/O. @@ -38,12 +39,9 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle /** The reader threads used for asynchronous block oriented channel reading. */ private final ReaderThread[] readers; - - /** Lock object to guard shutdown */ - private final Object shutdownLock = new Object(); - - /** Flag to mark the I/O manager as alive or shut down */ - private volatile boolean shutdown; + + /** Flag to signify that the IOManager has been shut down already */ + private final AtomicBoolean isShutdown = new AtomicBoolean(); // ------------------------------------------------------------------------- // Constructors / Destructors @@ -103,13 +101,12 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle */ @Override public void shutdown() { - synchronized (shutdownLock) { - if (shutdown) { - return; - } - - shutdown = true; - + // mark shut down and exit if it already was shut down + if (!isShutdown.compareAndSet(false, true)) { + return; + } + + try { if (LOG.isDebugEnabled()) { LOG.debug("Shutting down I/O manager."); } @@ -141,7 +138,14 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle rt.join(); } } - catch (InterruptedException iex) {} + catch (InterruptedException iex) { + // ignore this on shutdown + } + } + finally { + // make sure we all the super implementation in any case and at the last point, + // because this will clean up the I/O directories + super.shutdown(); } } @@ -160,17 +164,17 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle boolean writersShutDown = true; for (WriterThread wt : writers) { - readersShutDown &= wt.getState() == Thread.State.TERMINATED; + writersShutDown &= wt.getState() == Thread.State.TERMINATED; } - return shutdown && writersShutDown && readersShutDown; + return isShutdown.get() && readersShutDown && writersShutDown && super.isProperlyShutDown(); } @Override public void uncaughtException(Thread t, Throwable e) { - LOG.error("IO Thread '" + t.getName() + "' terminated due to an exception. Closing I/O Manager.", e); - shutdown(); + LOG.error("IO Thread '" + t.getName() + "' terminated due to an exception. Shutting down I/O Manager.", e); + shutdown(); } // ------------------------------------------------------------------------ @@ -181,13 +185,13 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle public BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID channelID, LinkedBlockingQueue returnQueue) throws IOException { - Preconditions.checkState(!shutdown, "I/O-Manger is closed."); + checkState(!isShutdown.get(), "I/O-Manger is shut down."); return new AsynchronousBlockWriter(channelID, this.writers[channelID.getThreadNum()].requestQueue, returnQueue); } @Override public BlockChannelWriterWithCallback createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback callback) throws IOException { - Preconditions.checkState(!shutdown, "I/O-Manger is closed."); + checkState(!isShutdown.get(), "I/O-Manger is shut down."); return new AsynchronousBlockWriterWithCallback(channelID, this.writers[channelID.getThreadNum()].requestQueue, callback); } @@ -205,7 +209,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle public BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID, LinkedBlockingQueue returnQueue) throws IOException { - Preconditions.checkState(!shutdown, "I/O-Manger is closed."); + checkState(!isShutdown.get(), "I/O-Manger is shut down."); return new AsynchronousBlockReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, returnQueue); } @@ -228,7 +232,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle public BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID, List targetSegments, int numBlocks) throws IOException { - Preconditions.checkState(!shutdown, "I/O-Manger is closed."); + checkState(!isShutdown.get(), "I/O-Manger is shut down."); return new AsynchronousBulkBlockReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, targetSegments, numBlocks); } @@ -446,4 +450,4 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle } }; // end writer thread -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/8038d227/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java index fc6ea37..55a53d1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java @@ -16,29 +16,22 @@ * limitations under the License. */ - package org.apache.flink.runtime.io.disk.iomanager; import java.io.File; import java.io.IOException; import java.util.List; - -import org.junit.Assert; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel; -import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader; -import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter; -import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.disk.iomanager.ReadRequest; -import org.apache.flink.runtime.io.disk.iomanager.WriteRequest; import org.apache.flink.runtime.memory.DefaultMemoryManagerTest.DummyInvokable; import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; import org.junit.After; import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.*; + public class IOManagerTest { // ------------------------------------------------------------------------ @@ -62,9 +55,9 @@ public class IOManagerTest { @After public void afterTest() { this.ioManager.shutdown(); - Assert.assertTrue("IO Manager has not properly shut down.", ioManager.isProperlyShutDown()); + assertTrue("IO Manager has not properly shut down.", ioManager.isProperlyShutDown()); - Assert.assertTrue("Not all memory was returned to the memory manager in the test.", this.memoryManager.verifyEmpty()); + assertTrue("Not all memory was returned to the memory manager in the test.", this.memoryManager.verifyEmpty()); this.memoryManager.shutdown(); this.memoryManager = null; } @@ -75,22 +68,52 @@ public class IOManagerTest { // ------------------------------------------------------------------------ - /** - * Tests that the channel enumerator creates channels in the temporary files directory. - */ @Test public void channelEnumerator() { - File tempPath = new File(System.getProperty("java.io.tmpdir")); - - FileIOChannel.Enumerator enumerator = ioManager.createChannelEnumerator(); + IOManager ioMan = null; - for (int i = 0; i < 10; i++) { - FileIOChannel.ID id = enumerator.next(); - - File path = new File(id.getPath()); - Assert.assertTrue("Channel IDs must name an absolute path.", path.isAbsolute()); - Assert.assertFalse("Channel IDs must name a file, not a directory.", path.isDirectory()); - Assert.assertTrue("Path is not in the temp directory.", tempPath.equals(path.getParentFile())); + try { + File tempPath = new File(System.getProperty("java.io.tmpdir")); + + String[] tempDirs = new String[]{ + new File(tempPath, "a").getAbsolutePath(), + new File(tempPath, "b").getAbsolutePath(), + new File(tempPath, "c").getAbsolutePath(), + new File(tempPath, "d").getAbsolutePath(), + new File(tempPath, "e").getAbsolutePath(), + }; + + int[] counters = new int[tempDirs.length]; + + ioMan = new TestIOManager(tempDirs); + FileIOChannel.Enumerator enumerator = ioMan.createChannelEnumerator(); + + for (int i = 0; i < 3 * tempDirs.length; i++) { + FileIOChannel.ID id = enumerator.next(); + + File path = id.getPathFile(); + + assertTrue("Channel IDs must name an absolute path.", path.isAbsolute()); + assertFalse("Channel IDs must name a file, not a directory.", path.isDirectory()); + + assertTrue("Path is not in the temp directory.", + tempPath.equals(path.getParentFile().getParentFile().getParentFile())); + + for (int k = 0; k < tempDirs.length; k++) { + if (path.getParentFile().getParent().equals(tempDirs[k])) { + counters[k]++; + } + } + } + + for (int k = 0; k < tempDirs.length; k++) { + assertEquals(3, counters[k]); + } + } + finally { + if (ioMan != null) { + ioMan.shutdown(); + } } } @@ -124,7 +147,7 @@ public class IOManagerTest { for (int pos = 0; pos < memSeg.size(); pos += 4) { if (memSeg.getInt(pos) != i) { - Assert.fail("Read memory segment contains invalid data."); + fail("Read memory segment contains invalid data."); } } } @@ -135,7 +158,7 @@ public class IOManagerTest { } catch (Exception ex) { ex.printStackTrace(); - Assert.fail("TEst encountered an exception: " + ex.getMessage()); + fail("TEst encountered an exception: " + ex.getMessage()); } } @@ -175,7 +198,7 @@ public class IOManagerTest { for (int pos = 0; pos < memSeg.size(); pos += 4) { if (memSeg.getInt(pos) != i) { - Assert.fail("Read memory segment contains invalid data."); + fail("Read memory segment contains invalid data."); } } reader.readBlock(memSeg); @@ -192,7 +215,7 @@ public class IOManagerTest { } catch (Exception ex) { ex.printStackTrace(); - Assert.fail("TEst encountered an exception: " + ex.getMessage()); + fail("TEst encountered an exception: " + ex.getMessage()); } } @@ -253,4 +276,33 @@ public class IOManagerTest { final class TestIOException extends IOException { private static final long serialVersionUID = -814705441998024472L; } + + // -------------------------------------------------------------------------------------------- + + private static class TestIOManager extends IOManager { + + protected TestIOManager(String[] paths) { + super(paths); + } + + @Override + public BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID channelID, LinkedBlockingQueue returnQueue) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public BlockChannelWriterWithCallback createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback callback) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID, LinkedBlockingQueue returnQueue) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID, List targetSegments, int numBlocks) throws IOException { + throw new UnsupportedOperationException(); + } + } }