Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 23E061027A for ; Thu, 19 Feb 2015 09:36:23 +0000 (UTC) Received: (qmail 59607 invoked by uid 500); 19 Feb 2015 09:36:23 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 59535 invoked by uid 500); 19 Feb 2015 09:36:23 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 59525 invoked by uid 99); 19 Feb 2015 09:36:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Feb 2015 09:36:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DA055E0664; Thu, 19 Feb 2015 09:36:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Thu, 19 Feb 2015 09:36:23 -0000 Message-Id: In-Reply-To: <0e8db2c8ae1643a8826adb019aa5a21f@git.apache.org> References: <0e8db2c8ae1643a8826adb019aa5a21f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] flink git commit: [FLINK-1483] IOManager puts temp files in dedicated directory and removes that on shutdown [FLINK-1483] IOManager puts temp files in dedicated directory and removes that on shutdown This closes #417 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c1a334e1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c1a334e1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c1a334e1 Branch: refs/heads/master Commit: c1a334e1c76afebe0435010b6203c109d86f043e Parents: 681ea06 Author: Stephan Ewen Authored: Wed Feb 18 15:03:25 2015 +0100 Committer: Stephan Ewen Committed: Thu Feb 19 10:35:28 2015 +0100 ---------------------------------------------------------------------- .../io/disk/iomanager/FileIOChannel.java | 34 +++++--- .../runtime/io/disk/iomanager/IOManager.java | 88 +++++++++++++++++--- .../io/disk/iomanager/IOManagerAsync.java | 48 ++++++----- .../io/disk/iomanager/IOManagerTest.java | 81 +++++++++--------- 4 files changed, 166 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c1a334e1/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java index d6f4458..e00568e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java @@ -84,24 +84,33 @@ public interface FileIOChannel { private static final int RANDOM_BYTES_LENGTH = 16; - private final String path; + private final File path; private final int threadNum; - protected ID(String path, int threadNum) { + protected ID(File path, int threadNum) { this.path = path; this.threadNum = threadNum; } - protected ID(String basePath, int threadNum, Random random) { - this.path = basePath + File.separator + randomString(random) + ".channel"; + protected ID(File basePath, int threadNum, Random random) { + this.path = new File(basePath, randomString(random) + ".channel"); this.threadNum = threadNum; } /** * Returns the path to the underlying temporary file. + * @return The path to the underlying temporary file.. */ public String getPath() { + return path.getAbsolutePath(); + } + + /** + * Returns the path to the underlying temporary file as a File. + * @return The path to the underlying temporary file as a File. + */ + public File getPathFile() { return path; } @@ -126,11 +135,11 @@ public interface FileIOChannel { @Override public String toString() { - return path; + return path.getAbsolutePath(); } - private static final String randomString(final Random random) { - final byte[] bytes = new byte[RANDOM_BYTES_LENGTH]; + private static String randomString(Random random) { + byte[] bytes = new byte[RANDOM_BYTES_LENGTH]; random.nextBytes(bytes); return StringUtils.byteToHexString(bytes); } @@ -140,24 +149,23 @@ public interface FileIOChannel { * An enumerator for channels that logically belong together. */ public static final class Enumerator { - - private static final String FORMAT = "%s%s%s.%06d.channel"; - private final String[] paths; + private final File[] paths; private final String namePrefix; private int counter; - protected Enumerator(String[] basePaths, Random random) { + protected Enumerator(File[] basePaths, Random random) { this.paths = basePaths; this.namePrefix = ID.randomString(random); this.counter = 0; } public ID next() { - final int threadNum = counter % paths.length; - return new ID(String.format(FORMAT, this.paths[threadNum], File.separator, namePrefix, (counter++)), threadNum); + int threadNum = counter % paths.length; + String filename = String.format(" %s.%06d.channel", namePrefix, (counter++)); + return new ID(new File(paths[threadNum], filename), threadNum); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/c1a334e1/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java index 6cf19f3..c04ba97 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.disk.iomanager; +import org.apache.commons.io.FileUtils; import org.apache.flink.core.memory.MemorySegment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,6 +27,7 @@ import java.io.File; import java.io.IOException; import java.util.List; import java.util.Random; +import java.util.UUID; import java.util.concurrent.LinkedBlockingQueue; /** @@ -37,7 +39,7 @@ public abstract class IOManager { protected static final Logger LOG = LoggerFactory.getLogger(IOManager.class); /** The temporary directories for files */ - private final String[] paths; + private final File[] paths; /** A random number generator for the anonymous ChannelIDs. */ private final Random random; @@ -45,6 +47,9 @@ public abstract class IOManager { /** The number of the next path to use. */ private volatile int nextPath; + /** Shutdown hook to make sure that the directories are removed on exit */ + private final Thread shutdownHook; + // ------------------------------------------------------------------------- // Constructors / Destructors // ------------------------------------------------------------------------- @@ -52,26 +57,86 @@ public abstract class IOManager { /** * Constructs a new IOManager. * - * @param paths - * the basic directory paths for files underlying anonymous channels. + * @param tempDirs The basic directories for files underlying anonymous channels. */ - protected IOManager(String[] paths) { - this.paths = paths; + protected IOManager(String[] tempDirs) { + if (tempDirs == null || tempDirs.length == 0) { + throw new IllegalArgumentException("The temporary directories must not be null or empty."); + } + this.random = new Random(); this.nextPath = 0; + + this.paths = new File[tempDirs.length]; + for (int i = 0; i < tempDirs.length; i++) { + File baseDir = new File(tempDirs[i]); + String subfolder = String.format("flink-io-%s", UUID.randomUUID().toString()); + File storageDir = new File(baseDir, subfolder); + + if (!storageDir.exists() && !storageDir.mkdirs()) { + throw new RuntimeException( + "Could not create storage directory for IOManager: " + storageDir.getAbsolutePath()); + } + paths[i] = storageDir; + LOG.info("I/O manager uses directory {} for spill files.", storageDir.getAbsolutePath()); + } + + this.shutdownHook = new Thread("I/O manager shutdown hook") { + @Override + public void run() { + shutdown(); + } + }; + Runtime.getRuntime().addShutdownHook(this.shutdownHook); } /** - * Close method, marks the I/O manager as closed. + * Close method, marks the I/O manager as closed + * and removed all temporary files. */ - public abstract void shutdown(); + public void shutdown() { + // remove all of our temp directories + for (File path : paths) { + try { + if (path != null) { + if (path.exists()) { + FileUtils.deleteDirectory(path); + LOG.info("I/O manager removed spill file directory {}", path.getAbsolutePath()); + } + } + } catch (Throwable t) { + LOG.error("IOManager failed to properly clean up temp file directory: " + path, t); + } + } + + // Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown hook itself + if (shutdownHook != Thread.currentThread()) { + try { + Runtime.getRuntime().removeShutdownHook(shutdownHook); + } + catch (IllegalStateException e) { + // race, JVM is in shutdown already, we can safely ignore this + } + catch (Throwable t) { + LOG.warn("Exception while unregistering IOManager's shutdown hook.", t); + } + } + } /** * Utility method to check whether the IO manager has been properly shut down. + * For this base implementation, this means that all files have been removed. * * @return True, if the IO manager has properly shut down, false otherwise. */ - public abstract boolean isProperlyShutDown(); + public boolean isProperlyShutDown() { + for (File path : paths) { + if (path != null && path.exists()) { + return false; + } + } + return true; + } // ------------------------------------------------------------------------ // Channel Instantiations @@ -107,7 +172,9 @@ public abstract class IOManager { */ public void deleteChannel(FileIOChannel.ID channel) throws IOException { if (channel != null) { - new File(channel.getPath()).delete(); + if (channel.getPathFile().exists() && !channel.getPathFile().delete()) { + LOG.warn("IOManager failed to delete temporary file {}", channel.getPath()); + } } } @@ -193,7 +260,8 @@ public abstract class IOManager { */ public abstract BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID, List targetSegments, int numBlocks) throws IOException; - + + // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/c1a334e1/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java index 6489396..2396665 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import static com.google.common.base.Preconditions.checkState; @@ -38,12 +39,9 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle /** The reader threads used for asynchronous block oriented channel reading. */ private final ReaderThread[] readers; - - /** Lock object to guard shutdown */ - private final Object shutdownLock = new Object(); - - /** Flag to mark the I/O manager as alive or shut down */ - private volatile boolean shutdown; + + /** Flag to signify that the IOManager has been shut down already */ + private final AtomicBoolean isShutdown = new AtomicBoolean(); // ------------------------------------------------------------------------- // Constructors / Destructors @@ -103,13 +101,12 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle */ @Override public void shutdown() { - synchronized (shutdownLock) { - if (shutdown) { - return; - } - - shutdown = true; - + // mark shut down and exit if it already was shut down + if (!isShutdown.compareAndSet(false, true)) { + return; + } + + try { if (LOG.isDebugEnabled()) { LOG.debug("Shutting down I/O manager."); } @@ -141,7 +138,14 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle rt.join(); } } - catch (InterruptedException iex) {} + catch (InterruptedException iex) { + // ignore this on shutdown + } + } + finally { + // make sure we all the super implementation in any case and at the last point, + // because this will clean up the I/O directories + super.shutdown(); } } @@ -160,17 +164,17 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle boolean writersShutDown = true; for (WriterThread wt : writers) { - readersShutDown &= wt.getState() == Thread.State.TERMINATED; + writersShutDown &= wt.getState() == Thread.State.TERMINATED; } - return shutdown && writersShutDown && readersShutDown; + return isShutdown.get() && readersShutDown && writersShutDown && super.isProperlyShutDown(); } @Override public void uncaughtException(Thread t, Throwable e) { - LOG.error("IO Thread '" + t.getName() + "' terminated due to an exception. Closing I/O Manager.", e); - shutdown(); + LOG.error("IO Thread '" + t.getName() + "' terminated due to an exception. Shutting down I/O Manager.", e); + shutdown(); } // ------------------------------------------------------------------------ @@ -181,13 +185,13 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle public BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID channelID, LinkedBlockingQueue returnQueue) throws IOException { - checkState(!shutdown, "I/O-Manger is closed."); + checkState(!isShutdown.get(), "I/O-Manger is shut down."); return new AsynchronousBlockWriter(channelID, this.writers[channelID.getThreadNum()].requestQueue, returnQueue); } @Override public BlockChannelWriterWithCallback createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback callback) throws IOException { - checkState(!shutdown, "I/O-Manger is closed."); + checkState(!isShutdown.get(), "I/O-Manger is shut down."); return new AsynchronousBlockWriterWithCallback(channelID, this.writers[channelID.getThreadNum()].requestQueue, callback); } @@ -205,7 +209,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle public BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID, LinkedBlockingQueue returnQueue) throws IOException { - checkState(!shutdown, "I/O-Manger is closed."); + checkState(!isShutdown.get(), "I/O-Manger is shut down."); return new AsynchronousBlockReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, returnQueue); } @@ -228,7 +232,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle public BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID, List targetSegments, int numBlocks) throws IOException { - checkState(!shutdown, "I/O-Manger is closed."); + checkState(!isShutdown.get(), "I/O-Manger is shut down."); return new AsynchronousBulkBlockReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, targetSegments, numBlocks); } http://git-wip-us.apache.org/repos/asf/flink/blob/c1a334e1/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java index 4be667a..5f5bed3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java @@ -34,41 +34,50 @@ public class IOManagerTest { @Test public void channelEnumerator() { - File tempPath = new File(System.getProperty("java.io.tmpdir")); - - String[] tempDirs = new String[] { - new File(tempPath, "a").getAbsolutePath(), - new File(tempPath, "b").getAbsolutePath(), - new File(tempPath, "c").getAbsolutePath(), - new File(tempPath, "d").getAbsolutePath(), - new File(tempPath, "e").getAbsolutePath(), - }; - - int[] counters = new int[tempDirs.length]; - - - FileIOChannel.Enumerator enumerator = new TestIOManager(tempDirs).createChannelEnumerator(); - - for (int i = 0; i < 3 * tempDirs.length; i++) { - FileIOChannel.ID id = enumerator.next(); - - File path = new File(id.getPath()); - - assertTrue("Channel IDs must name an absolute path.", path.isAbsolute()); - - assertFalse("Channel IDs must name a file, not a directory.", path.isDirectory()); - - assertTrue("Path is not in the temp directory.", tempPath.equals(path.getParentFile().getParentFile())); - - for (int k = 0; k < tempDirs.length; k++) { - if (path.getParent().equals(tempDirs[k])) { - counters[k]++; + IOManager ioMan = null; + + try { + File tempPath = new File(System.getProperty("java.io.tmpdir")); + + String[] tempDirs = new String[]{ + new File(tempPath, "a").getAbsolutePath(), + new File(tempPath, "b").getAbsolutePath(), + new File(tempPath, "c").getAbsolutePath(), + new File(tempPath, "d").getAbsolutePath(), + new File(tempPath, "e").getAbsolutePath(), + }; + + int[] counters = new int[tempDirs.length]; + + ioMan = new TestIOManager(tempDirs); + FileIOChannel.Enumerator enumerator = ioMan.createChannelEnumerator(); + + for (int i = 0; i < 3 * tempDirs.length; i++) { + FileIOChannel.ID id = enumerator.next(); + + File path = id.getPathFile(); + + assertTrue("Channel IDs must name an absolute path.", path.isAbsolute()); + assertFalse("Channel IDs must name a file, not a directory.", path.isDirectory()); + + assertTrue("Path is not in the temp directory.", + tempPath.equals(path.getParentFile().getParentFile().getParentFile())); + + for (int k = 0; k < tempDirs.length; k++) { + if (path.getParentFile().getParent().equals(tempDirs[k])) { + counters[k]++; + } } } + + for (int k = 0; k < tempDirs.length; k++) { + assertEquals(3, counters[k]); + } } - - for (int k = 0; k < tempDirs.length; k++) { - assertEquals(3, counters[k]); + finally { + if (ioMan != null) { + ioMan.shutdown(); + } } } @@ -81,14 +90,6 @@ public class IOManagerTest { } @Override - public void shutdown() {} - - @Override - public boolean isProperlyShutDown() { - return false; - } - - @Override public BlockChannelWriter createBlockChannelWriter(ID channelID, LinkedBlockingQueue returnQueue) { throw new UnsupportedOperationException(); }