flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/2] flink git commit: [FLINK-1483] IOManager puts temp files in dedicated directory and removes that on shutdown
Date Thu, 19 Feb 2015 09:36:23 GMT
[FLINK-1483] IOManager puts temp files in dedicated directory and removes that on shutdown

This closes #417


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c1a334e1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c1a334e1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c1a334e1

Branch: refs/heads/master
Commit: c1a334e1c76afebe0435010b6203c109d86f043e
Parents: 681ea06
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Feb 18 15:03:25 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Feb 19 10:35:28 2015 +0100

----------------------------------------------------------------------
 .../io/disk/iomanager/FileIOChannel.java        | 34 +++++---
 .../runtime/io/disk/iomanager/IOManager.java    | 88 +++++++++++++++++---
 .../io/disk/iomanager/IOManagerAsync.java       | 48 ++++++-----
 .../io/disk/iomanager/IOManagerTest.java        | 81 +++++++++---------
 4 files changed, 166 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c1a334e1/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
index d6f4458..e00568e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
@@ -84,24 +84,33 @@ public interface FileIOChannel {
 		
 		private static final int RANDOM_BYTES_LENGTH = 16;
 		
-		private final String path;
+		private final File path;
 		
 		private final int threadNum;
 
-		protected ID(String path, int threadNum) {
+		protected ID(File path, int threadNum) {
 			this.path = path;
 			this.threadNum = threadNum;
 		}
 
-		protected ID(String basePath, int threadNum, Random random) {
-			this.path = basePath + File.separator + randomString(random) + ".channel";
+		protected ID(File basePath, int threadNum, Random random) {
+			this.path = new File(basePath, randomString(random) + ".channel");
 			this.threadNum = threadNum;
 		}
 
 		/**
 		 * Returns the path to the underlying temporary file.
+		 * @return The path to the underlying temporary file..
 		 */
 		public String getPath() {
+			return path.getAbsolutePath();
+		}
+
+		/**
+		 * Returns the path to the underlying temporary file as a File.
+		 * @return The path to the underlying temporary file as a File.
+		 */
+		public File getPathFile() {
 			return path;
 		}
 		
@@ -126,11 +135,11 @@ public interface FileIOChannel {
 		
 		@Override
 		public String toString() {
-			return path;
+			return path.getAbsolutePath();
 		}
 		
-		private static final String randomString(final Random random) {
-			final byte[] bytes = new byte[RANDOM_BYTES_LENGTH];
+		private static String randomString(Random random) {
+			byte[] bytes = new byte[RANDOM_BYTES_LENGTH];
 			random.nextBytes(bytes);
 			return StringUtils.byteToHexString(bytes);
 		}
@@ -140,24 +149,23 @@ public interface FileIOChannel {
 	 * An enumerator for channels that logically belong together.
 	 */
 	public static final class Enumerator {
-		
-		private static final String FORMAT = "%s%s%s.%06d.channel";
 
-		private final String[] paths;
+		private final File[] paths;
 		
 		private final String namePrefix;
 
 		private int counter;
 
-		protected Enumerator(String[] basePaths, Random random) {
+		protected Enumerator(File[] basePaths, Random random) {
 			this.paths = basePaths;
 			this.namePrefix = ID.randomString(random);
 			this.counter = 0;
 		}
 
 		public ID next() {
-			final int threadNum = counter % paths.length;
-			return new ID(String.format(FORMAT, this.paths[threadNum], File.separator, namePrefix,
(counter++)), threadNum);
+			int threadNum = counter % paths.length;
+			String filename = String.format(" %s.%06d.channel", namePrefix, (counter++));
+			return new ID(new File(paths[threadNum], filename), threadNum);
 		}
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/c1a334e1/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index 6cf19f3..c04ba97 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.disk.iomanager;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.flink.core.memory.MemorySegment;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -26,6 +27,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import java.util.Random;
+import java.util.UUID;
 import java.util.concurrent.LinkedBlockingQueue;
 
 /**
@@ -37,7 +39,7 @@ public abstract class IOManager {
 	protected static final Logger LOG = LoggerFactory.getLogger(IOManager.class);
 
 	/** The temporary directories for files */
-	private final String[] paths;
+	private final File[] paths;
 
 	/** A random number generator for the anonymous ChannelIDs. */
 	private final Random random;
@@ -45,6 +47,9 @@ public abstract class IOManager {
 	/** The number of the next path to use. */
 	private volatile int nextPath;
 
+	/** Shutdown hook to make sure that the directories are removed on exit */
+	private final Thread shutdownHook;
+
 	// -------------------------------------------------------------------------
 	//               Constructors / Destructors
 	// -------------------------------------------------------------------------
@@ -52,26 +57,86 @@ public abstract class IOManager {
 	/**
 	 * Constructs a new IOManager.
 	 *
-	 * @param paths
-	 *        the basic directory paths for files underlying anonymous channels.
+	 * @param tempDirs The basic directories for files underlying anonymous channels.
 	 */
-	protected IOManager(String[] paths) {
-		this.paths = paths;
+	protected IOManager(String[] tempDirs) {
+		if (tempDirs == null || tempDirs.length == 0) {
+			throw new IllegalArgumentException("The temporary directories must not be null or empty.");
+		}
+
 		this.random = new Random();
 		this.nextPath = 0;
+
+		this.paths = new File[tempDirs.length];
+		for (int i = 0; i < tempDirs.length; i++) {
+			File baseDir = new File(tempDirs[i]);
+			String subfolder = String.format("flink-io-%s", UUID.randomUUID().toString());
+			File storageDir = new File(baseDir, subfolder);
+
+			if (!storageDir.exists() && !storageDir.mkdirs()) {
+				throw new RuntimeException(
+						"Could not create storage directory for IOManager: " + storageDir.getAbsolutePath());
+			}
+			paths[i] = storageDir;
+			LOG.info("I/O manager uses directory {} for spill files.", storageDir.getAbsolutePath());
+		}
+
+		this.shutdownHook = new Thread("I/O manager shutdown hook") {
+			@Override
+			public void run() {
+				shutdown();
+			}
+		};
+		Runtime.getRuntime().addShutdownHook(this.shutdownHook);
 	}
 
 	/**
-	 * Close method, marks the I/O manager as closed.
+	 * Close method, marks the I/O manager as closed
+	 * and removed all temporary files.
 	 */
-	public abstract void shutdown();
+	public void shutdown() {
+		// remove all of our temp directories
+		for (File path : paths) {
+			try {
+				if (path != null) {
+					if (path.exists()) {
+						FileUtils.deleteDirectory(path);
+						LOG.info("I/O manager removed spill file directory {}", path.getAbsolutePath());
+					}
+				}
+			} catch (Throwable t) {
+				LOG.error("IOManager failed to properly clean up temp file directory: " + path, t);
+			}
+		}
+
+		// Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown
hook itself
+		if (shutdownHook != Thread.currentThread()) {
+			try {
+				Runtime.getRuntime().removeShutdownHook(shutdownHook);
+			}
+			catch (IllegalStateException e) {
+				// race, JVM is in shutdown already, we can safely ignore this
+			}
+			catch (Throwable t) {
+				LOG.warn("Exception while unregistering IOManager's shutdown hook.", t);
+			}
+		}
+	}
 
 	/**
 	 * Utility method to check whether the IO manager has been properly shut down.
+	 * For this base implementation, this means that all files have been removed.
 	 *
 	 * @return True, if the IO manager has properly shut down, false otherwise.
 	 */
-	public abstract boolean isProperlyShutDown();
+	public boolean isProperlyShutDown() {
+		for (File path : paths) {
+			if (path != null && path.exists()) {
+				return false;
+			}
+		}
+		return true;
+	}
 
 	// ------------------------------------------------------------------------
 	//                          Channel Instantiations
@@ -107,7 +172,9 @@ public abstract class IOManager {
 	 */
 	public void deleteChannel(FileIOChannel.ID channel) throws IOException {
 		if (channel != null) {
-			new File(channel.getPath()).delete();
+			if (channel.getPathFile().exists() && !channel.getPathFile().delete()) {
+				LOG.warn("IOManager failed to delete temporary file {}", channel.getPath());
+			}
 		}
 	}
 	
@@ -193,7 +260,8 @@ public abstract class IOManager {
 	 */
 	public abstract BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID,
 			List<MemorySegment> targetSegments, int numBlocks) throws IOException;
-	
+
+
 	// ------------------------------------------------------------------------
 	//                          Utilities
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/c1a334e1/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
index 6489396..2396665 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static com.google.common.base.Preconditions.checkState;
 
@@ -38,12 +39,9 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 
 	/** The reader threads used for asynchronous block oriented channel reading. */
 	private final ReaderThread[] readers;
-	
-	/** Lock object to guard shutdown */
-	private final Object shutdownLock = new Object();
-	
-	/** Flag to mark the I/O manager as alive or shut down */
-	private volatile boolean shutdown;
+
+	/** Flag to signify that the IOManager has been shut down already */
+	private final AtomicBoolean isShutdown = new AtomicBoolean();
 	
 	// -------------------------------------------------------------------------
 	//               Constructors / Destructors
@@ -103,13 +101,12 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 	 */
 	@Override
 	public void shutdown() {
-		synchronized (shutdownLock) {
-			if (shutdown) {
-				return;
-			}
-			
-			shutdown = true;
-			
+		// mark shut down and exit if it already was shut down
+		if (!isShutdown.compareAndSet(false, true)) {
+			return;
+		}
+
+		try {
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("Shutting down I/O manager.");
 			}
@@ -141,7 +138,14 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 					rt.join();
 				}
 			}
-			catch (InterruptedException iex) {}
+			catch (InterruptedException iex) {
+				// ignore this on shutdown
+			}
+		}
+		finally {
+			// make sure we all the super implementation in any case and at the last point,
+			// because this will clean up the I/O directories
+			super.shutdown();
 		}
 	}
 	
@@ -160,17 +164,17 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 		
 		boolean writersShutDown = true;
 		for (WriterThread wt : writers) {
-			readersShutDown &= wt.getState() == Thread.State.TERMINATED;
+			writersShutDown &= wt.getState() == Thread.State.TERMINATED;
 		}
 		
-		return shutdown && writersShutDown && readersShutDown;
+		return isShutdown.get() && readersShutDown && writersShutDown &&
super.isProperlyShutDown();
 	}
 
 
 	@Override
 	public void uncaughtException(Thread t, Throwable e) {
-		LOG.error("IO Thread '" + t.getName() + "' terminated due to an exception. Closing I/O
Manager.", e);
-		shutdown();	
+		LOG.error("IO Thread '" + t.getName() + "' terminated due to an exception. Shutting down
I/O Manager.", e);
+		shutdown();
 	}
 	
 	// ------------------------------------------------------------------------
@@ -181,13 +185,13 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 	public BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID channelID,
 								LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException
 	{
-		checkState(!shutdown, "I/O-Manger is closed.");
+		checkState(!isShutdown.get(), "I/O-Manger is shut down.");
 		return new AsynchronousBlockWriter(channelID, this.writers[channelID.getThreadNum()].requestQueue,
returnQueue);
 	}
 	
 	@Override
 	public BlockChannelWriterWithCallback createBlockChannelWriter(FileIOChannel.ID channelID,
RequestDoneCallback<MemorySegment> callback) throws IOException {
-		checkState(!shutdown, "I/O-Manger is closed.");
+		checkState(!isShutdown.get(), "I/O-Manger is shut down.");
 		return new AsynchronousBlockWriterWithCallback(channelID, this.writers[channelID.getThreadNum()].requestQueue,
callback);
 	}
 	
@@ -205,7 +209,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 	public BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID,
 										LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException
 	{
-		checkState(!shutdown, "I/O-Manger is closed.");
+		checkState(!isShutdown.get(), "I/O-Manger is shut down.");
 		return new AsynchronousBlockReader(channelID, this.readers[channelID.getThreadNum()].requestQueue,
returnQueue);
 	}
 	
@@ -228,7 +232,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 	public BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID,
 			List<MemorySegment> targetSegments,	int numBlocks) throws IOException
 	{
-		checkState(!shutdown, "I/O-Manger is closed.");
+		checkState(!isShutdown.get(), "I/O-Manger is shut down.");
 		return new AsynchronousBulkBlockReader(channelID, this.readers[channelID.getThreadNum()].requestQueue,
targetSegments, numBlocks);
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/c1a334e1/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
index 4be667a..5f5bed3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
@@ -34,41 +34,50 @@ public class IOManagerTest {
 
 	@Test
 	public void channelEnumerator() {
-		File tempPath = new File(System.getProperty("java.io.tmpdir"));
-		
-		String[] tempDirs = new String[] {
-			new File(tempPath, "a").getAbsolutePath(),
-			new File(tempPath, "b").getAbsolutePath(),
-			new File(tempPath, "c").getAbsolutePath(),
-			new File(tempPath, "d").getAbsolutePath(),
-			new File(tempPath, "e").getAbsolutePath(),
-		};
-		
-		int[] counters = new int[tempDirs.length];
-		
-		
-		FileIOChannel.Enumerator enumerator = new TestIOManager(tempDirs).createChannelEnumerator();
-
-		for (int i = 0; i < 3 * tempDirs.length; i++) {
-			FileIOChannel.ID id = enumerator.next();
-			
-			File path = new File(id.getPath());
-			
-			assertTrue("Channel IDs must name an absolute path.", path.isAbsolute());
-			
-			assertFalse("Channel IDs must name a file, not a directory.", path.isDirectory());
-			
-			assertTrue("Path is not in the temp directory.", tempPath.equals(path.getParentFile().getParentFile()));
-			
-			for (int k = 0; k < tempDirs.length; k++) {
-				if (path.getParent().equals(tempDirs[k])) {
-					counters[k]++;
+		IOManager ioMan = null;
+
+		try {
+			File tempPath = new File(System.getProperty("java.io.tmpdir"));
+
+			String[] tempDirs = new String[]{
+					new File(tempPath, "a").getAbsolutePath(),
+					new File(tempPath, "b").getAbsolutePath(),
+					new File(tempPath, "c").getAbsolutePath(),
+					new File(tempPath, "d").getAbsolutePath(),
+					new File(tempPath, "e").getAbsolutePath(),
+			};
+
+			int[] counters = new int[tempDirs.length];
+
+			ioMan = new TestIOManager(tempDirs);
+			FileIOChannel.Enumerator enumerator = ioMan.createChannelEnumerator();
+
+			for (int i = 0; i < 3 * tempDirs.length; i++) {
+				FileIOChannel.ID id = enumerator.next();
+
+				File path = id.getPathFile();
+
+				assertTrue("Channel IDs must name an absolute path.", path.isAbsolute());
+				assertFalse("Channel IDs must name a file, not a directory.", path.isDirectory());
+
+				assertTrue("Path is not in the temp directory.",
+						tempPath.equals(path.getParentFile().getParentFile().getParentFile()));
+
+				for (int k = 0; k < tempDirs.length; k++) {
+					if (path.getParentFile().getParent().equals(tempDirs[k])) {
+						counters[k]++;
+					}
 				}
 			}
+
+			for (int k = 0; k < tempDirs.length; k++) {
+				assertEquals(3, counters[k]);
+			}
 		}
-		
-		for (int k = 0; k < tempDirs.length; k++) {
-			assertEquals(3, counters[k]);
+		finally {
+			if (ioMan != null) {
+				ioMan.shutdown();
+			}
 		}
 	}
 	
@@ -81,14 +90,6 @@ public class IOManagerTest {
 		}
 
 		@Override
-		public void shutdown() {}
-
-		@Override
-		public boolean isProperlyShutDown() {
-			return false;
-		}
-
-		@Override
 		public BlockChannelWriter createBlockChannelWriter(ID channelID, LinkedBlockingQueue<MemorySegment>
returnQueue) {
 			throw new UnsupportedOperationException();
 		}


Mime
View raw message