Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 89929 invoked from network); 12 Jan 2010 19:50:10 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 12 Jan 2010 19:50:10 -0000 Received: (qmail 80355 invoked by uid 500); 12 Jan 2010 19:50:10 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 80296 invoked by uid 500); 12 Jan 2010 19:50:10 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 80286 invoked by uid 99); 12 Jan 2010 19:50:10 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 Jan 2010 19:50:10 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 Jan 2010 19:50:02 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 8903623889BF; Tue, 12 Jan 2010 19:49:42 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r898486 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/filecache/ src/java/org/apache/hadoop/mapreduce/util/ src/test/mapred/org/apache/hadoop/mapreduce/util/ Date: Tue, 12 Jan 2010 19:49:42 -0000 To: mapreduce-commits@hadoop.apache.org From: dhruba@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100112194942.8903623889BF@eris.apache.org> Author: dhruba Date: Tue Jan 12 19:49:41 2010 New Revision: 898486 URL: http://svn.apache.org/viewvc?rev=898486&view=rev Log: MAPREDUCE-1302. TrackerDistributedCacheManager deletes file asynchronously, thus reducing task initialization delays. (Zheng Shao via dhruba) Modified: hadoop/mapreduce/trunk/CHANGES.txt hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/util/TestMRAsyncDiskService.java Modified: hadoop/mapreduce/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=898486&r1=898485&r2=898486&view=diff ============================================================================== --- hadoop/mapreduce/trunk/CHANGES.txt (original) +++ hadoop/mapreduce/trunk/CHANGES.txt Tue Jan 12 19:49:41 2010 @@ -107,6 +107,10 @@ comparisons, permitting non-Writable intermediate data. (Aaron Kimball via cutting) + MAPREDUCE-1302. TrackerDistributedCacheManager deletes file + asynchronously, thus reducing task initialization delays. + (Zheng Shao via dhruba) + OPTIMIZATIONS MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?rev=898486&r1=898485&r2=898486&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Tue Jan 12 19:49:41 2010 @@ -456,6 +456,11 @@ return getStrings(MRConfig.LOCAL_DIR); } + /** + * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead. + * @see org.apache.hadoop.mapreduce.util.MRAsyncDiskService#cleanupAllVolumes() + */ + @Deprecated public void deleteLocalFiles() throws IOException { String[] localDirs = getLocalDirs(); for (int i = 0; i < localDirs.length; i++) { Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=898486&r1=898485&r2=898486&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Jan 12 19:49:41 2010 @@ -90,7 +90,6 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UnixUserGroupInformation; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.ConfiguredPolicy; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; @@ -566,10 +565,11 @@ fConf.get(TT_DNS_NAMESERVER,"default")); } - //check local disk and start async disk service + // Check local disk, start async disk service, and clean up all + // local directories. checkLocalDirs(this.fConf.getLocalDirs()); - asyncDiskService = new MRAsyncDiskService(FileSystem.getLocal(fConf), fConf.getLocalDirs()); - asyncDiskService.moveAndDeleteFromEachVolume(SUBDIR); + asyncDiskService = new MRAsyncDiskService(fConf); + asyncDiskService.cleanupAllVolumes(); // Clear out state tables this.tasks.clear(); @@ -640,12 +640,14 @@ taskController = (TaskController) ReflectionUtils.newInstance( taskControllerClass, fConf); + // setup and create jobcache directory with appropriate permissions taskController.setup(); // Initialize DistributedCache - this.distributedCacheManager = new TrackerDistributedCacheManager( - this.fConf, taskController); + this.distributedCacheManager = + new TrackerDistributedCacheManager(this.fConf, taskController, + asyncDiskService); this.jobClient = (InterTrackerProtocol) RPC.waitForProxy(InterTrackerProtocol.class, @@ -694,10 +696,14 @@ t, TaskTrackerInstrumentation.class); } - /** - * Removes all contents of temporary storage. Called upon + /** + * Removes all contents of temporary storage. Called upon * startup, to remove any leftovers from previous run. + * + * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead. + * @see org.apache.hadoop.mapreduce.util.MRAsyncDiskService#cleanupAllVolumes() */ + @Deprecated public void cleanupStorage() throws IOException { this.fConf.deleteLocalFiles(); } @@ -1092,9 +1098,23 @@ this.running = false; - // Clear local storage - cleanupStorage(); - + if (asyncDiskService != null) { + // Clear local storage + asyncDiskService.cleanupAllVolumes(); + + // Shutdown all async deletion threads with up to 10 seconds of delay + asyncDiskService.shutdown(); + try { + if (!asyncDiskService.awaitTermination(10000)) { + asyncDiskService.shutdownNow(); + asyncDiskService = null; + } + } catch (InterruptedException e) { + asyncDiskService.shutdownNow(); + asyncDiskService = null; + } + } + // Shutdown the fetcher thread this.mapEventsFetcher.interrupt(); Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=898486&r1=898485&r2=898486&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java Tue Jan 12 19:49:41 2010 @@ -35,6 +35,7 @@ import org.apache.hadoop.mapred.TaskController.DistributedCacheFileContext; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig; +import org.apache.hadoop.mapreduce.util.MRAsyncDiskService; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; @@ -77,6 +78,8 @@ private Random random = new Random(); + private MRAsyncDiskService asyncDiskService; + public TrackerDistributedCacheManager(Configuration conf, TaskController taskController) throws IOException { this.localFs = FileSystem.getLocal(conf); @@ -86,6 +89,18 @@ } /** + * Creates a TrackerDistributedCacheManager with a MRAsyncDiskService. + * @param asyncDiskService Provides a set of ThreadPools for async disk + * operations. + */ + public TrackerDistributedCacheManager(Configuration conf, + TaskController taskController, MRAsyncDiskService asyncDiskService) + throws IOException { + this(conf, taskController); + this.asyncDiskService = asyncDiskService; + } + + /** * Get the locally cached file or archive; it could either be * previously cached (and valid) or copy it from the {@link FileSystem} now. * @@ -252,8 +267,8 @@ // do the deletion, after releasing the global lock for (CacheStatus lcacheStatus : deleteSet) { synchronized (lcacheStatus) { - FileSystem.getLocal(conf).delete(lcacheStatus.localizedLoadPath, true); - LOG.info("Deleted path " + lcacheStatus.localizedLoadPath); + deleteLocalPath(asyncDiskService, + FileSystem.getLocal(conf), lcacheStatus.localizedLoadPath); // decrement the size of the cache from baseDirSize synchronized (baseDirSize) { Long dirSize = baseDirSize.get(lcacheStatus.localizedBaseDir); @@ -269,6 +284,30 @@ } } + /** + * Delete a local path with asyncDiskService if available, + * or otherwise synchronously with local file system. + */ + private static void deleteLocalPath(MRAsyncDiskService asyncDiskService, + LocalFileSystem fs, Path path) throws IOException { + boolean deleted = false; + if (asyncDiskService != null) { + // Try to delete using asyncDiskService + String localPathToDelete = + path.toUri().getPath(); + deleted = asyncDiskService.moveAndDeleteAbsolutePath(localPathToDelete); + if (!deleted) { + LOG.warn("Cannot find DistributedCache path " + localPathToDelete + + " on any of the asyncDiskService volumes!"); + } + } + if (!deleted) { + // If no asyncDiskService, we will delete the files synchronously + fs.delete(path, true); + } + LOG.info("Deleted path " + path); + } + /* * Returns the relative path of the dir this cache will be localized in * relative path that this cache will be localized in. For @@ -620,7 +659,7 @@ synchronized (cachedArchives) { for (Map.Entry f: cachedArchives.entrySet()) { try { - localFs.delete(f.getValue().localizedLoadPath, true); + deleteLocalPath(asyncDiskService, localFs, f.getValue().localizedLoadPath); } catch (IOException ie) { LOG.debug("Error cleaning up cache", ie); } Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java?rev=898486&r1=898485&r2=898486&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java Tue Jan 12 19:49:41 2010 @@ -22,31 +22,41 @@ import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.AsyncDiskService; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; -/* +/** * This class is a container of multiple thread pools, each for a volume, * so that we can schedule async disk operations easily. * * Examples of async disk operations are deletion of files. - * We can move the files to a "TO_BE_DELETED" folder before asychronously + * We can move the files to a "toBeDeleted" folder before asychronously * deleting it, to make sure the caller can run it faster. * + * Users should not write files into the "toBeDeleted" folder, otherwise + * the files can be gone any time we restart the MRAsyncDiskService. + * * This class also contains all operations that will be performed by the * thread pools. */ +@InterfaceAudience.Private public class MRAsyncDiskService { public static final Log LOG = LogFactory.getLog(MRAsyncDiskService.class); AsyncDiskService asyncDiskService; + public static final String TOBEDELETED = "toBeDeleted"; + /** * Create a AsyncDiskServices with a set of volumes (specified by their * root directories). @@ -57,21 +67,53 @@ * @param localFileSystem The localFileSystem used for deletions. * @param volumes The roots of the file system volumes. */ - public MRAsyncDiskService(FileSystem localFileSystem, String[] volumes) throws IOException { - - asyncDiskService = new AsyncDiskService(volumes); + public MRAsyncDiskService(FileSystem localFileSystem, String[] volumes) + throws IOException { + this.volumes = new String[volumes.length]; + for (int v = 0; v < volumes.length; v++) { + this.volumes[v] = normalizePath(volumes[v]); + } this.localFileSystem = localFileSystem; - this.volumes = volumes; + + asyncDiskService = new AsyncDiskService(this.volumes); // Create one ThreadPool per volume for (int v = 0 ; v < volumes.length; v++) { // Create the root for file deletion - if (!localFileSystem.mkdirs(new Path(volumes[v], SUBDIR))) { - throw new IOException("Cannot create " + SUBDIR + " in " + volumes[v]); + Path absoluteSubdir = new Path(volumes[v], TOBEDELETED); + if (!localFileSystem.mkdirs(absoluteSubdir)) { + throw new IOException("Cannot create " + TOBEDELETED + " in " + volumes[v]); } } + // Create tasks to delete the paths inside the volumes + for (int v = 0 ; v < volumes.length; v++) { + Path absoluteSubdir = new Path(volumes[v], TOBEDELETED); + // List all files inside the volumes + FileStatus[] files = localFileSystem.listStatus(absoluteSubdir); + for (int f = 0; f < files.length; f++) { + // Get the relative file name to the root of the volume + String absoluteFilename = files[f].getPath().toUri().getPath(); + String relative = getRelativePathName(absoluteFilename, volumes[v]); + if (relative == null) { + // This should never happen + throw new IOException("Cannot delete " + absoluteFilename + + " because it's outside of " + volumes[v]); + } + DeleteTask task = new DeleteTask(volumes[v], absoluteFilename, + relative); + execute(volumes[v], task); + } + } + } + + /** + * Initialize MRAsyncDiskService based on conf. + * @param conf local file system and local dirs will be read from conf + */ + public MRAsyncDiskService(JobConf conf) throws IOException { + this(FileSystem.getLocal(conf), conf.getLocalDirs()); } /** @@ -84,7 +126,7 @@ /** * Gracefully start the shut down of all ThreadPools. */ - synchronized void shutdown() { + public synchronized void shutdown() { asyncDiskService.shutdown(); } @@ -99,7 +141,7 @@ * Wait for the termination of the thread pools. * * @param milliseconds The number of milliseconds to wait - * @return true if all thread pools are terminated without time limit + * @return true if all thread pools are terminated within time limit * @throws InterruptedException */ public synchronized boolean awaitTermination(long milliseconds) @@ -107,15 +149,13 @@ return asyncDiskService.awaitTermination(milliseconds); } - public static final String SUBDIR = "toBeDeleted"; - private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss.SSS"); private FileSystem localFileSystem; private String[] volumes; - private int uniqueId = 0; + private static AtomicLong uniqueId = new AtomicLong(0); /** A task for deleting a pathName from a volume. */ @@ -133,7 +173,7 @@ * @param volume The volume that the file/dir is in. * @param originalPath The original name, relative to volume root. * @param pathToBeDeleted The name after the move, relative to volume root, - * containing SUBDIR. + * containing TOBEDELETED. */ DeleteTask(String volume, String originalPath, String pathToBeDeleted) { this.volume = volume; @@ -161,7 +201,8 @@ if (!success) { if (e != null) { - LOG.warn("Failure in " + this + " with exception " + StringUtils.stringifyException(e)); + LOG.warn("Failure in " + this + " with exception " + + StringUtils.stringifyException(e)); } else { LOG.warn("Failure in " + this); } @@ -181,23 +222,34 @@ * won't see the path name under the old name anyway after the move. * * @param volume The disk volume - * @param pathName The path name relative to volume. + * @param pathName The path name relative to volume root. * @throws IOException If the move failed + * @return false if the file is not found */ - public boolean moveAndDelete(String volume, String pathName) throws IOException { + public boolean moveAndDeleteRelativePath(String volume, String pathName) + throws IOException { + + volume = normalizePath(volume); + // Move the file right now, so that it can be deleted later - String newPathName; - synchronized (this) { - newPathName = format.format(new Date()) + "_" + uniqueId; - uniqueId ++; - } - newPathName = SUBDIR + Path.SEPARATOR_CHAR + newPathName; + String newPathName = + format.format(new Date()) + "_" + uniqueId.getAndIncrement(); + newPathName = TOBEDELETED + Path.SEPARATOR_CHAR + newPathName; Path source = new Path(volume, pathName); Path target = new Path(volume, newPathName); try { if (!localFileSystem.rename(source, target)) { - return false; + // Try to recreate the parent directory just in case it gets deleted. + if (!localFileSystem.mkdirs(new Path(volume, TOBEDELETED))) { + throw new IOException("Cannot create " + TOBEDELETED + " under " + + volume); + } + // Try rename again. If it fails, return false. + if (!localFileSystem.rename(source, target)) { + throw new IOException("Cannot rename " + source + " to " + + target); + } } } catch (FileNotFoundException e) { // Return false in case that the file is not found. @@ -217,13 +269,99 @@ * deletions are done. This is usually good enough because applications * won't see the path name under the old name anyway after the move. * - * @param pathName The path name on each volume. - * @throws IOException If the move failed + * @param pathName The path name relative to each volume root + * @throws IOException If any of the move failed + * @return false If any of the target pathName did not exist, + * note that the operation is still done on all volumes. */ - public void moveAndDeleteFromEachVolume(String pathName) throws IOException { + public boolean moveAndDeleteFromEachVolume(String pathName) throws IOException { + boolean result = true; for (int i = 0; i < volumes.length; i++) { - moveAndDelete(volumes[i], pathName); + result = result && moveAndDeleteRelativePath(volumes[i], pathName); } + return result; + } + + /** + * Move all files/directories inside volume into TOBEDELETED, and then + * delete them. The TOBEDELETED directory itself is ignored. + */ + public void cleanupAllVolumes() throws IOException { + for (int v = 0; v < volumes.length; v++) { + // List all files inside the volumes + FileStatus[] files = localFileSystem.listStatus(new Path(volumes[v])); + for (int f = 0; f < files.length; f++) { + // Get the relative file name to the root of the volume + String absoluteFilename = files[f].getPath().toUri().getPath(); + String relative = getRelativePathName(absoluteFilename, volumes[v]); + if (relative == null) { + // This should never happen + throw new IOException("Cannot delete " + absoluteFilename + + " because it's outside of " + volumes[v]); + } + // Do not delete the current TOBEDELETED + if (!TOBEDELETED.equals(relative)) { + moveAndDeleteRelativePath(volumes[v], relative); + } + } + } + } + + /** + * Returns the normalized path of a path. + */ + private static String normalizePath(String path) { + return (new Path(path)).toUri().getPath(); + } + + /** + * Get the relative path name with respect to the root of the volume. + * @param absolutePathName The absolute path name + * @param volume Root of the volume. + * @return null if the absolute path name is outside of the volume. + */ + private static String getRelativePathName(String absolutePathName, + String volume) { + + absolutePathName = normalizePath(absolutePathName); + // Get the file names + if (!absolutePathName.startsWith(volume)) { + return null; + } + // Get rid of the volume prefix + String fileName = absolutePathName.substring(volume.length()); + if (fileName.charAt(0) == Path.SEPARATOR_CHAR) { + fileName = fileName.substring(1); + } + return fileName; + } + + /** + * Move the path name to a temporary location and then delete it. + * + * Note that if there is no volume that contains this path, the path + * will stay as it is, and the function will return false. + * + * This functions returns when the moves are done, but not necessarily all + * deletions are done. This is usually good enough because applications + * won't see the path name under the old name anyway after the move. + * + * @param absolutePathName The path name from root "/" + * @throws IOException If the move failed + * @return false if we are unable to move the path name + */ + public boolean moveAndDeleteAbsolutePath(String absolutePathName) + throws IOException { + + for (int v = 0; v < volumes.length; v++) { + String relative = getRelativePathName(absolutePathName, volumes[v]); + if (relative != null) { + return moveAndDeleteRelativePath(volumes[v], relative); + } + } + + throw new IOException("Cannot delete " + absolutePathName + + " because it's outside of all volumes."); } } Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/util/TestMRAsyncDiskService.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/util/TestMRAsyncDiskService.java?rev=898486&r1=898485&r2=898486&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/util/TestMRAsyncDiskService.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/util/TestMRAsyncDiskService.java Tue Jan 12 19:49:41 2010 @@ -18,11 +18,14 @@ package org.apache.hadoop.mapreduce.util; import java.io.File; +import java.io.IOException; + import junit.framework.TestCase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.util.MRAsyncDiskService; +import org.junit.Test; /** * A test for MRAsyncDiskService. @@ -33,10 +36,10 @@ "test.build.data", "/tmp")).toString(); /** - * This test creates one empty directory, and one directory with content, - * and then removes them through MRAsyncDiskService. - * @throws Throwable + * This test creates some directories and then removes them through + * MRAsyncDiskService. */ + @Test public void testMRAsyncDiskService() throws Throwable { FileSystem localFileSystem = FileSystem.getLocal(new Configuration()); @@ -48,27 +51,139 @@ String a = "a"; String b = "b"; String c = "b/c"; + String d = "d"; File fa = new File(vols[0], a); File fb = new File(vols[1], b); File fc = new File(vols[1], c); + File fd = new File(vols[1], d); // Create the directories fa.mkdirs(); fb.mkdirs(); fc.mkdirs(); + fd.mkdirs(); assertTrue(fa.exists()); assertTrue(fb.exists()); assertTrue(fc.exists()); + assertTrue(fd.exists()); // Move and delete them - service.moveAndDelete(vols[0], a); + service.moveAndDeleteRelativePath(vols[0], a); assertFalse(fa.exists()); - service.moveAndDelete(vols[1], b); + service.moveAndDeleteRelativePath(vols[1], b); assertFalse(fb.exists()); assertFalse(fc.exists()); + // asyncDiskService is NOT able to delete files outside all volumes. + IOException ee = null; + try { + service.moveAndDeleteAbsolutePath(TEST_ROOT_DIR + "/2"); + } catch (IOException e) { + ee = e; + } + assertNotNull("asyncDiskService should not be able to delete files " + + "outside all volumes", ee); + // asyncDiskService is able to automatically find the file in one + // of the volumes. + assertTrue(service.moveAndDeleteAbsolutePath(vols[1] + Path.SEPARATOR_CHAR + d)); + + // Make sure everything is cleaned up + makeSureCleanedUp(vols, service); + } + + /** + * This test creates some directories inside the volume roots, and then + * call asyncDiskService.MoveAndDeleteAllVolumes. + * We should be able to delete all files/dirs inside the volumes except + * the toBeDeleted directory. + */ + @Test + public void testMRAsyncDiskServiceMoveAndDeleteAllVolumes() throws Throwable { + FileSystem localFileSystem = FileSystem.getLocal(new Configuration()); + String[] vols = new String[]{TEST_ROOT_DIR + "/0", + TEST_ROOT_DIR + "/1"}; + MRAsyncDiskService service = new MRAsyncDiskService( + localFileSystem, vols); + + String a = "a"; + String b = "b"; + String c = "b/c"; + String d = "d"; + + File fa = new File(vols[0], a); + File fb = new File(vols[1], b); + File fc = new File(vols[1], c); + File fd = new File(vols[1], d); + + // Create the directories + fa.mkdirs(); + fb.mkdirs(); + fc.mkdirs(); + fd.mkdirs(); + + assertTrue(fa.exists()); + assertTrue(fb.exists()); + assertTrue(fc.exists()); + assertTrue(fd.exists()); + + // Delete all of them + service.cleanupAllVolumes(); + + assertFalse(fa.exists()); + assertFalse(fb.exists()); + assertFalse(fc.exists()); + assertFalse(fd.exists()); + + // Make sure everything is cleaned up + makeSureCleanedUp(vols, service); + } + + /** + * This test creates some directories inside the toBeDeleted directory and + * then start the asyncDiskService. + * AsyncDiskService will create tasks to delete the content inside the + * toBeDeleted directories. + */ + @Test + public void testMRAsyncDiskServiceStartupCleaning() throws Throwable { + FileSystem localFileSystem = FileSystem.getLocal(new Configuration()); + String[] vols = new String[]{TEST_ROOT_DIR + "/0", + TEST_ROOT_DIR + "/1"}; + + String a = "a"; + String b = "b"; + String c = "b/c"; + String d = "d"; + + // Create directories inside SUBDIR + File fa = new File(vols[0] + Path.SEPARATOR_CHAR + MRAsyncDiskService.TOBEDELETED, a); + File fb = new File(vols[1] + Path.SEPARATOR_CHAR + MRAsyncDiskService.TOBEDELETED, b); + File fc = new File(vols[1] + Path.SEPARATOR_CHAR + MRAsyncDiskService.TOBEDELETED, c); + File fd = new File(vols[1] + Path.SEPARATOR_CHAR + MRAsyncDiskService.TOBEDELETED, d); + + // Create the directories + fa.mkdirs(); + fb.mkdirs(); + fc.mkdirs(); + fd.mkdirs(); + + assertTrue(fa.exists()); + assertTrue(fb.exists()); + assertTrue(fc.exists()); + assertTrue(fd.exists()); + + // Create the asyncDiskService which will delete all contents inside SUBDIR + MRAsyncDiskService service = new MRAsyncDiskService( + localFileSystem, vols); + + // Make sure everything is cleaned up + makeSureCleanedUp(vols, service); + } + + private void makeSureCleanedUp(String[] vols, MRAsyncDiskService service) + throws Throwable { // Sleep at most 5 seconds to make sure the deleted items are all gone. service.shutdown(); if (!service.awaitTermination(5000)) { @@ -76,12 +191,18 @@ } // All contents should be gone by now. - for (int i = 0; i < 2; i++) { - File toBeDeletedDir = new File(vols[0], MRAsyncDiskService.SUBDIR); + for (int i = 0; i < vols.length; i++) { + File subDir = new File(vols[0]); + String[] subDirContent = subDir.list(); + assertEquals("Volume should contain a single child: " + + MRAsyncDiskService.TOBEDELETED, 1, subDirContent.length); + + File toBeDeletedDir = new File(vols[0], MRAsyncDiskService.TOBEDELETED); String[] content = toBeDeletedDir.list(); assertNotNull("Cannot find " + toBeDeletedDir, content); - assertEquals("" + toBeDeletedDir + " should be empty now.", - 0, content.length); + assertEquals("" + toBeDeletedDir + " should be empty now.", 0, + content.length); } } + }