hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhr...@apache.org
Subject svn commit: r898486 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/filecache/ src/java/org/apache/hadoop/mapreduce/util/ src/test/mapred/org/apache/hadoop/mapreduce/util/
Date Tue, 12 Jan 2010 19:49:42 GMT
Author: dhruba
Date: Tue Jan 12 19:49:41 2010
New Revision: 898486

URL: http://svn.apache.org/viewvc?rev=898486&view=rev
Log:
MAPREDUCE-1302. TrackerDistributedCacheManager deletes file
asynchronously, thus reducing task initialization delays.
(Zheng Shao via dhruba)


Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/util/TestMRAsyncDiskService.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=898486&r1=898485&r2=898486&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Jan 12 19:49:41 2010
@@ -107,6 +107,10 @@
     comparisons, permitting non-Writable intermediate data.
     (Aaron Kimball via cutting)
 
+    MAPREDUCE-1302. TrackerDistributedCacheManager deletes file
+    asynchronously, thus reducing task initialization delays.
+    (Zheng Shao via dhruba)
+
   OPTIMIZATIONS
 
     MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?rev=898486&r1=898485&r2=898486&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Tue Jan 12 19:49:41
2010
@@ -456,6 +456,11 @@
     return getStrings(MRConfig.LOCAL_DIR);
   }
 
+  /**
+   * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead.
+   * @see org.apache.hadoop.mapreduce.util.MRAsyncDiskService#cleanupAllVolumes()
+   */
+  @Deprecated
   public void deleteLocalFiles() throws IOException {
     String[] localDirs = getLocalDirs();
     for (int i = 0; i < localDirs.length; i++) {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=898486&r1=898485&r2=898486&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Jan 12 19:49:41
2010
@@ -90,7 +90,6 @@
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UnixUserGroupInformation;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.ConfiguredPolicy;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
@@ -566,10 +565,11 @@
        fConf.get(TT_DNS_NAMESERVER,"default"));
     }
  
-    //check local disk and start async disk service
+    // Check local disk, start async disk service, and clean up all 
+    // local directories.
     checkLocalDirs(this.fConf.getLocalDirs());
-    asyncDiskService = new MRAsyncDiskService(FileSystem.getLocal(fConf), fConf.getLocalDirs());
-    asyncDiskService.moveAndDeleteFromEachVolume(SUBDIR);
+    asyncDiskService = new MRAsyncDiskService(fConf);
+    asyncDiskService.cleanupAllVolumes();
 
     // Clear out state tables
     this.tasks.clear();
@@ -640,12 +640,14 @@
     taskController = (TaskController) ReflectionUtils.newInstance(
         taskControllerClass, fConf);
 
+
     // setup and create jobcache directory with appropriate permissions
     taskController.setup();
 
     // Initialize DistributedCache
-    this.distributedCacheManager = new TrackerDistributedCacheManager(
-        this.fConf, taskController);
+    this.distributedCacheManager = 
+        new TrackerDistributedCacheManager(this.fConf, taskController,
+        asyncDiskService);
 
     this.jobClient = (InterTrackerProtocol) 
       RPC.waitForProxy(InterTrackerProtocol.class,
@@ -694,10 +696,14 @@
         t, TaskTrackerInstrumentation.class);
   }
   
-  /** 
-   * Removes all contents of temporary storage.  Called upon 
+  /**
+   * Removes all contents of temporary storage.  Called upon
    * startup, to remove any leftovers from previous run.
+   * 
+   * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead.
+   * @see org.apache.hadoop.mapreduce.util.MRAsyncDiskService#cleanupAllVolumes()
    */
+  @Deprecated
   public void cleanupStorage() throws IOException {
     this.fConf.deleteLocalFiles();
   }
@@ -1092,9 +1098,23 @@
 
     this.running = false;
 
-    // Clear local storage
-    cleanupStorage();
-        
+    if (asyncDiskService != null) {
+      // Clear local storage
+      asyncDiskService.cleanupAllVolumes();
+      
+      // Shutdown all async deletion threads with up to 10 seconds of delay
+      asyncDiskService.shutdown();
+      try {
+        if (!asyncDiskService.awaitTermination(10000)) {
+          asyncDiskService.shutdownNow();
+          asyncDiskService = null;
+        }
+      } catch (InterruptedException e) {
+        asyncDiskService.shutdownNow();
+        asyncDiskService = null;
+      }
+    }
+    
     // Shutdown the fetcher thread
     this.mapEventsFetcher.interrupt();
     

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=898486&r1=898485&r2=898486&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
Tue Jan 12 19:49:41 2010
@@ -35,6 +35,7 @@
 import org.apache.hadoop.mapred.TaskController.DistributedCacheFileContext;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -77,6 +78,8 @@
   
   private Random random = new Random();
 
+  private MRAsyncDiskService asyncDiskService;
+  
   public TrackerDistributedCacheManager(Configuration conf,
       TaskController taskController) throws IOException {
     this.localFs = FileSystem.getLocal(conf);
@@ -86,6 +89,18 @@
   }
 
   /**
+   * Creates a TrackerDistributedCacheManager with a MRAsyncDiskService.
+   * @param asyncDiskService Provides a set of ThreadPools for async disk 
+   *                         operations.  
+   */
+  public TrackerDistributedCacheManager(Configuration conf,
+      TaskController taskController, MRAsyncDiskService asyncDiskService)
+      throws IOException {
+    this(conf, taskController);
+    this.asyncDiskService = asyncDiskService;
+  }
+
+  /**
    * Get the locally cached file or archive; it could either be
    * previously cached (and valid) or copy it from the {@link FileSystem} now.
    *
@@ -252,8 +267,8 @@
     // do the deletion, after releasing the global lock
     for (CacheStatus lcacheStatus : deleteSet) {
       synchronized (lcacheStatus) {
-        FileSystem.getLocal(conf).delete(lcacheStatus.localizedLoadPath, true);
-        LOG.info("Deleted path " + lcacheStatus.localizedLoadPath);
+        deleteLocalPath(asyncDiskService,
+            FileSystem.getLocal(conf), lcacheStatus.localizedLoadPath);
         // decrement the size of the cache from baseDirSize
         synchronized (baseDirSize) {
           Long dirSize = baseDirSize.get(lcacheStatus.localizedBaseDir);
@@ -269,6 +284,30 @@
     }
   }
 
+  /**
+   * Delete a local path with asyncDiskService if available,
+   * or otherwise synchronously with local file system.
+   */
+  private static void deleteLocalPath(MRAsyncDiskService asyncDiskService,
+      LocalFileSystem fs, Path path) throws IOException {
+    boolean deleted = false;
+    if (asyncDiskService != null) {
+      // Try to delete using asyncDiskService
+      String localPathToDelete = 
+        path.toUri().getPath();
+      deleted = asyncDiskService.moveAndDeleteAbsolutePath(localPathToDelete);
+      if (!deleted) {
+        LOG.warn("Cannot find DistributedCache path " + localPathToDelete
+            + " on any of the asyncDiskService volumes!");
+      }
+    }
+    if (!deleted) {
+      // If no asyncDiskService, we will delete the files synchronously
+      fs.delete(path, true);
+    }
+    LOG.info("Deleted path " + path);
+  }
+  
   /*
    * Returns the relative path of the dir this cache will be localized in
    * relative path that this cache will be localized in. For
@@ -620,7 +659,7 @@
     synchronized (cachedArchives) {
       for (Map.Entry<String,CacheStatus> f: cachedArchives.entrySet()) {
         try {
-          localFs.delete(f.getValue().localizedLoadPath, true);
+          deleteLocalPath(asyncDiskService, localFs, f.getValue().localizedLoadPath);
         } catch (IOException ie) {
           LOG.debug("Error cleaning up cache", ie);
         }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java?rev=898486&r1=898485&r2=898486&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java
Tue Jan 12 19:49:41 2010
@@ -22,31 +22,41 @@
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.AsyncDiskService;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
 
-/*
+/**
  * This class is a container of multiple thread pools, each for a volume,
  * so that we can schedule async disk operations easily.
  * 
  * Examples of async disk operations are deletion of files.
- * We can move the files to a "TO_BE_DELETED" folder before asychronously
+ * We can move the files to a "toBeDeleted" folder before asychronously
  * deleting it, to make sure the caller can run it faster.
  * 
+ * Users should not write files into the "toBeDeleted" folder, otherwise
+ * the files can be gone any time we restart the MRAsyncDiskService.  
+ * 
  * This class also contains all operations that will be performed by the
  * thread pools. 
  */
+@InterfaceAudience.Private
 public class MRAsyncDiskService {
   
   public static final Log LOG = LogFactory.getLog(MRAsyncDiskService.class);
   
   AsyncDiskService asyncDiskService;
   
+  public static final String TOBEDELETED = "toBeDeleted";
+  
   /**
    * Create a AsyncDiskServices with a set of volumes (specified by their
    * root directories).
@@ -57,21 +67,53 @@
    * @param localFileSystem The localFileSystem used for deletions.
    * @param volumes The roots of the file system volumes.
    */
-  public MRAsyncDiskService(FileSystem localFileSystem, String[] volumes) throws IOException
{
-    
-    asyncDiskService = new AsyncDiskService(volumes);
+  public MRAsyncDiskService(FileSystem localFileSystem, String[] volumes)
+      throws IOException {
     
+    this.volumes = new String[volumes.length];
+    for (int v = 0; v < volumes.length; v++) {
+      this.volumes[v] = normalizePath(volumes[v]);
+    }  
     this.localFileSystem = localFileSystem;
-    this.volumes = volumes;
+    
+    asyncDiskService = new AsyncDiskService(this.volumes);
     
     // Create one ThreadPool per volume
     for (int v = 0 ; v < volumes.length; v++) {
       // Create the root for file deletion
-      if (!localFileSystem.mkdirs(new Path(volumes[v], SUBDIR))) {
-        throw new IOException("Cannot create " + SUBDIR + " in " + volumes[v]);
+      Path absoluteSubdir = new Path(volumes[v], TOBEDELETED);
+      if (!localFileSystem.mkdirs(absoluteSubdir)) {
+        throw new IOException("Cannot create " + TOBEDELETED + " in " + volumes[v]);
       }
     }
     
+    // Create tasks to delete the paths inside the volumes
+    for (int v = 0 ; v < volumes.length; v++) {
+      Path absoluteSubdir = new Path(volumes[v], TOBEDELETED);
+      // List all files inside the volumes
+      FileStatus[] files = localFileSystem.listStatus(absoluteSubdir);
+      for (int f = 0; f < files.length; f++) {
+        // Get the relative file name to the root of the volume
+        String absoluteFilename = files[f].getPath().toUri().getPath();
+        String relative = getRelativePathName(absoluteFilename, volumes[v]);
+        if (relative == null) {
+          // This should never happen
+          throw new IOException("Cannot delete " + absoluteFilename
+              + " because it's outside of " + volumes[v]);
+        }
+        DeleteTask task = new DeleteTask(volumes[v], absoluteFilename,
+            relative);
+        execute(volumes[v], task);
+      }
+    }
+  }
+  
+  /**
+   * Initialize MRAsyncDiskService based on conf.
+   * @param conf  local file system and local dirs will be read from conf 
+   */
+  public MRAsyncDiskService(JobConf conf) throws IOException {
+    this(FileSystem.getLocal(conf), conf.getLocalDirs());
   }
   
   /**
@@ -84,7 +126,7 @@
   /**
    * Gracefully start the shut down of all ThreadPools.
    */
-  synchronized void shutdown() {
+  public synchronized void shutdown() {
     asyncDiskService.shutdown();
   }
 
@@ -99,7 +141,7 @@
    * Wait for the termination of the thread pools.
    * 
    * @param milliseconds  The number of milliseconds to wait
-   * @return   true if all thread pools are terminated without time limit
+   * @return   true if all thread pools are terminated within time limit
    * @throws InterruptedException 
    */
   public synchronized boolean awaitTermination(long milliseconds) 
@@ -107,15 +149,13 @@
     return asyncDiskService.awaitTermination(milliseconds);
   }
   
-  public static final String SUBDIR = "toBeDeleted";
-  
   private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss.SSS");
   
   private FileSystem localFileSystem;
   
   private String[] volumes; 
                  
-  private int uniqueId = 0;
+  private static AtomicLong uniqueId = new AtomicLong(0);
   
   /** A task for deleting a pathName from a volume.
    */
@@ -133,7 +173,7 @@
      * @param volume        The volume that the file/dir is in.
      * @param originalPath  The original name, relative to volume root.
      * @param pathToBeDeleted  The name after the move, relative to volume root,
-     *                         containing SUBDIR.
+     *                         containing TOBEDELETED.
      */
     DeleteTask(String volume, String originalPath, String pathToBeDeleted) {
       this.volume = volume;
@@ -161,7 +201,8 @@
       
       if (!success) {
         if (e != null) {
-          LOG.warn("Failure in " + this + " with exception " + StringUtils.stringifyException(e));
+          LOG.warn("Failure in " + this + " with exception "
+              + StringUtils.stringifyException(e));
         } else {
           LOG.warn("Failure in " + this);
         }
@@ -181,23 +222,34 @@
    * won't see the path name under the old name anyway after the move. 
    * 
    * @param volume       The disk volume
-   * @param pathName     The path name relative to volume.
+   * @param pathName     The path name relative to volume root.
    * @throws IOException If the move failed 
+   * @return   false     if the file is not found
    */
-  public boolean moveAndDelete(String volume, String pathName) throws IOException {
+  public boolean moveAndDeleteRelativePath(String volume, String pathName)
+      throws IOException {
+    
+    volume = normalizePath(volume);
+    
     // Move the file right now, so that it can be deleted later
-    String newPathName;
-    synchronized (this) {
-      newPathName = format.format(new Date()) + "_" + uniqueId;
-      uniqueId ++;
-    }
-    newPathName = SUBDIR + Path.SEPARATOR_CHAR + newPathName;
+    String newPathName = 
+        format.format(new Date()) + "_" + uniqueId.getAndIncrement();
+    newPathName = TOBEDELETED + Path.SEPARATOR_CHAR + newPathName;
     
     Path source = new Path(volume, pathName);
     Path target = new Path(volume, newPathName); 
     try {
       if (!localFileSystem.rename(source, target)) {
-        return false;
+        // Try to recreate the parent directory just in case it gets deleted.
+        if (!localFileSystem.mkdirs(new Path(volume, TOBEDELETED))) {
+          throw new IOException("Cannot create " + TOBEDELETED + " under "
+              + volume);
+        }
+        // Try rename again. If it fails, return false.
+        if (!localFileSystem.rename(source, target)) {
+          throw new IOException("Cannot rename " + source + " to "
+              + target);
+        }
       }
     } catch (FileNotFoundException e) {
       // Return false in case that the file is not found.  
@@ -217,13 +269,99 @@
    * deletions are done. This is usually good enough because applications 
    * won't see the path name under the old name anyway after the move. 
    * 
-   * @param pathName     The path name on each volume.
-   * @throws IOException If the move failed 
+   * @param pathName     The path name relative to each volume root
+   * @throws IOException If any of the move failed 
+   * @return   false     If any of the target pathName did not exist,
+   *                     note that the operation is still done on all volumes.
    */
-  public void moveAndDeleteFromEachVolume(String pathName) throws IOException {
+  public boolean moveAndDeleteFromEachVolume(String pathName) throws IOException {
+    boolean result = true; 
     for (int i = 0; i < volumes.length; i++) {
-      moveAndDelete(volumes[i], pathName);
+      result = result && moveAndDeleteRelativePath(volumes[i], pathName);
     }
+    return result;
+  }
+
+  /**
+   * Move all files/directories inside volume into TOBEDELETED, and then
+   * delete them.  The TOBEDELETED directory itself is ignored.
+   */
+  public void cleanupAllVolumes() throws IOException {
+    for (int v = 0; v < volumes.length; v++) {
+      // List all files inside the volumes
+      FileStatus[] files = localFileSystem.listStatus(new Path(volumes[v]));
+      for (int f = 0; f < files.length; f++) {
+        // Get the relative file name to the root of the volume
+        String absoluteFilename = files[f].getPath().toUri().getPath();
+        String relative = getRelativePathName(absoluteFilename, volumes[v]);
+        if (relative == null) {
+          // This should never happen
+          throw new IOException("Cannot delete " + absoluteFilename
+              + " because it's outside of " + volumes[v]);
+        }
+        // Do not delete the current TOBEDELETED
+        if (!TOBEDELETED.equals(relative)) {
+          moveAndDeleteRelativePath(volumes[v], relative);
+        }
+      }
+    }
+  }
+  
+  /**
+   * Returns the normalized path of a path.
+   */
+  private static String normalizePath(String path) {
+    return (new Path(path)).toUri().getPath();
+  }
+  
+  /**
+   * Get the relative path name with respect to the root of the volume.
+   * @param absolutePathName The absolute path name
+   * @param volume Root of the volume.
+   * @return null if the absolute path name is outside of the volume.
+   */
+  private static String getRelativePathName(String absolutePathName,
+      String volume) {
+    
+    absolutePathName = normalizePath(absolutePathName);
+    // Get the file names
+    if (!absolutePathName.startsWith(volume)) {
+      return null;
+    }
+    // Get rid of the volume prefix
+    String fileName = absolutePathName.substring(volume.length());
+    if (fileName.charAt(0) == Path.SEPARATOR_CHAR) {
+      fileName = fileName.substring(1);
+    }
+    return fileName;
+  }
+  
+  /**
+   * Move the path name to a temporary location and then delete it.
+   * 
+   * Note that if there is no volume that contains this path, the path
+   * will stay as it is, and the function will return false.
+   *  
+   * This functions returns when the moves are done, but not necessarily all
+   * deletions are done. This is usually good enough because applications 
+   * won't see the path name under the old name anyway after the move. 
+   * 
+   * @param absolutePathName    The path name from root "/"
+   * @throws IOException        If the move failed
+   * @return   false if we are unable to move the path name
+   */
+  public boolean moveAndDeleteAbsolutePath(String absolutePathName)
+      throws IOException {
+    
+    for (int v = 0; v < volumes.length; v++) {
+      String relative = getRelativePathName(absolutePathName, volumes[v]);
+      if (relative != null) {
+        return moveAndDeleteRelativePath(volumes[v], relative);
+      }
+    }
+    
+    throw new IOException("Cannot delete " + absolutePathName
+        + " because it's outside of all volumes.");
   }
   
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/util/TestMRAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/util/TestMRAsyncDiskService.java?rev=898486&r1=898485&r2=898486&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/util/TestMRAsyncDiskService.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/util/TestMRAsyncDiskService.java
Tue Jan 12 19:49:41 2010
@@ -18,11 +18,14 @@
 package org.apache.hadoop.mapreduce.util;
 
 import java.io.File;
+import java.io.IOException;
+
 import junit.framework.TestCase;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
+import org.junit.Test;
 
 /**
  * A test for MRAsyncDiskService.
@@ -33,10 +36,10 @@
       "test.build.data", "/tmp")).toString();
 
   /**
-   * This test creates one empty directory, and one directory with content, 
-   * and then removes them through MRAsyncDiskService. 
-   * @throws Throwable
+   * This test creates some directories and then removes them through 
+   * MRAsyncDiskService. 
    */
+  @Test
   public void testMRAsyncDiskService() throws Throwable {
   
     FileSystem localFileSystem = FileSystem.getLocal(new Configuration());
@@ -48,27 +51,139 @@
     String a = "a";
     String b = "b";
     String c = "b/c";
+    String d = "d";
     
     File fa = new File(vols[0], a);
     File fb = new File(vols[1], b);
     File fc = new File(vols[1], c);
+    File fd = new File(vols[1], d);
     
     // Create the directories
     fa.mkdirs();
     fb.mkdirs();
     fc.mkdirs();
+    fd.mkdirs();
     
     assertTrue(fa.exists());
     assertTrue(fb.exists());
     assertTrue(fc.exists());
+    assertTrue(fd.exists());
     
     // Move and delete them
-    service.moveAndDelete(vols[0], a);
+    service.moveAndDeleteRelativePath(vols[0], a);
     assertFalse(fa.exists());
-    service.moveAndDelete(vols[1], b);
+    service.moveAndDeleteRelativePath(vols[1], b);
     assertFalse(fb.exists());
     assertFalse(fc.exists());
     
+    // asyncDiskService is NOT able to delete files outside all volumes.
+    IOException ee = null;
+    try {
+      service.moveAndDeleteAbsolutePath(TEST_ROOT_DIR + "/2");
+    } catch (IOException e) {
+      ee = e;
+    }
+    assertNotNull("asyncDiskService should not be able to delete files "
+        + "outside all volumes", ee);
+    // asyncDiskService is able to automatically find the file in one
+    // of the volumes.
+    assertTrue(service.moveAndDeleteAbsolutePath(vols[1] + Path.SEPARATOR_CHAR + d));
+    
+    // Make sure everything is cleaned up
+    makeSureCleanedUp(vols, service);
+  }
+
+  /**
+   * This test creates some directories inside the volume roots, and then 
+   * call asyncDiskService.MoveAndDeleteAllVolumes.
+   * We should be able to delete all files/dirs inside the volumes except
+   * the toBeDeleted directory.
+   */
+  @Test
+  public void testMRAsyncDiskServiceMoveAndDeleteAllVolumes() throws Throwable {
+    FileSystem localFileSystem = FileSystem.getLocal(new Configuration());
+    String[] vols = new String[]{TEST_ROOT_DIR + "/0",
+        TEST_ROOT_DIR + "/1"};
+    MRAsyncDiskService service = new MRAsyncDiskService(
+        localFileSystem, vols);
+
+    String a = "a";
+    String b = "b";
+    String c = "b/c";
+    String d = "d";
+    
+    File fa = new File(vols[0], a);
+    File fb = new File(vols[1], b);
+    File fc = new File(vols[1], c);
+    File fd = new File(vols[1], d);
+    
+    // Create the directories
+    fa.mkdirs();
+    fb.mkdirs();
+    fc.mkdirs();
+    fd.mkdirs();
+
+    assertTrue(fa.exists());
+    assertTrue(fb.exists());
+    assertTrue(fc.exists());
+    assertTrue(fd.exists());
+    
+    // Delete all of them
+    service.cleanupAllVolumes();
+    
+    assertFalse(fa.exists());
+    assertFalse(fb.exists());
+    assertFalse(fc.exists());
+    assertFalse(fd.exists());
+    
+    // Make sure everything is cleaned up
+    makeSureCleanedUp(vols, service);
+  }
+  
+  /**
+   * This test creates some directories inside the toBeDeleted directory and
+   * then start the asyncDiskService.
+   * AsyncDiskService will create tasks to delete the content inside the
+   * toBeDeleted directories.
+   */
+  @Test
+  public void testMRAsyncDiskServiceStartupCleaning() throws Throwable {
+    FileSystem localFileSystem = FileSystem.getLocal(new Configuration());
+    String[] vols = new String[]{TEST_ROOT_DIR + "/0",
+        TEST_ROOT_DIR + "/1"};
+
+    String a = "a";
+    String b = "b";
+    String c = "b/c";
+    String d = "d";
+    
+    // Create directories inside SUBDIR
+    File fa = new File(vols[0] + Path.SEPARATOR_CHAR + MRAsyncDiskService.TOBEDELETED, a);
+    File fb = new File(vols[1] + Path.SEPARATOR_CHAR + MRAsyncDiskService.TOBEDELETED, b);
+    File fc = new File(vols[1] + Path.SEPARATOR_CHAR + MRAsyncDiskService.TOBEDELETED, c);
+    File fd = new File(vols[1] + Path.SEPARATOR_CHAR + MRAsyncDiskService.TOBEDELETED, d);
+    
+    // Create the directories
+    fa.mkdirs();
+    fb.mkdirs();
+    fc.mkdirs();
+    fd.mkdirs();
+
+    assertTrue(fa.exists());
+    assertTrue(fb.exists());
+    assertTrue(fc.exists());
+    assertTrue(fd.exists());
+    
+    // Create the asyncDiskService which will delete all contents inside SUBDIR
+    MRAsyncDiskService service = new MRAsyncDiskService(
+        localFileSystem, vols);
+    
+    // Make sure everything is cleaned up
+    makeSureCleanedUp(vols, service);
+  }
+  
+  private void makeSureCleanedUp(String[] vols, MRAsyncDiskService service)
+      throws Throwable {
     // Sleep at most 5 seconds to make sure the deleted items are all gone.
     service.shutdown();
     if (!service.awaitTermination(5000)) {
@@ -76,12 +191,18 @@
     }
     
     // All contents should be gone by now.
-    for (int i = 0; i < 2; i++) {
-      File toBeDeletedDir = new File(vols[0], MRAsyncDiskService.SUBDIR);
+    for (int i = 0; i < vols.length; i++) {
+      File subDir = new File(vols[0]);
+      String[] subDirContent = subDir.list();
+      assertEquals("Volume should contain a single child: "
+          + MRAsyncDiskService.TOBEDELETED, 1, subDirContent.length);
+      
+      File toBeDeletedDir = new File(vols[0], MRAsyncDiskService.TOBEDELETED);
       String[] content = toBeDeletedDir.list();
       assertNotNull("Cannot find " + toBeDeletedDir, content);
-      assertEquals("" + toBeDeletedDir + " should be empty now.", 
-          0, content.length);
+      assertEquals("" + toBeDeletedDir + " should be empty now.", 0,
+          content.length);
     }
   }
+    
 }



Mime
View raw message