Author: dhruba
Date: Tue Jan 12 19:49:41 2010
New Revision: 898486
URL: http://svn.apache.org/viewvc?rev=898486&view=rev
Log:
MAPREDUCE-1302. TrackerDistributedCacheManager deletes file
asynchronously, thus reducing task initialization delays.
(Zheng Shao via dhruba)
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/util/TestMRAsyncDiskService.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=898486&r1=898485&r2=898486&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Jan 12 19:49:41 2010
@@ -107,6 +107,10 @@
comparisons, permitting non-Writable intermediate data.
(Aaron Kimball via cutting)
+ MAPREDUCE-1302. TrackerDistributedCacheManager deletes file
+ asynchronously, thus reducing task initialization delays.
+ (Zheng Shao via dhruba)
+
OPTIMIZATIONS
MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?rev=898486&r1=898485&r2=898486&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Tue Jan 12 19:49:41
2010
@@ -456,6 +456,11 @@
return getStrings(MRConfig.LOCAL_DIR);
}
+ /**
+ * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead.
+ * @see org.apache.hadoop.mapreduce.util.MRAsyncDiskService#cleanupAllVolumes()
+ */
+ @Deprecated
public void deleteLocalFiles() throws IOException {
String[] localDirs = getLocalDirs();
for (int i = 0; i < localDirs.length; i++) {
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=898486&r1=898485&r2=898486&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Jan 12 19:49:41
2010
@@ -90,7 +90,6 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UnixUserGroupInformation;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.ConfiguredPolicy;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
@@ -566,10 +565,11 @@
fConf.get(TT_DNS_NAMESERVER,"default"));
}
- //check local disk and start async disk service
+ // Check local disk, start async disk service, and clean up all
+ // local directories.
checkLocalDirs(this.fConf.getLocalDirs());
- asyncDiskService = new MRAsyncDiskService(FileSystem.getLocal(fConf), fConf.getLocalDirs());
- asyncDiskService.moveAndDeleteFromEachVolume(SUBDIR);
+ asyncDiskService = new MRAsyncDiskService(fConf);
+ asyncDiskService.cleanupAllVolumes();
// Clear out state tables
this.tasks.clear();
@@ -640,12 +640,14 @@
taskController = (TaskController) ReflectionUtils.newInstance(
taskControllerClass, fConf);
+
// setup and create jobcache directory with appropriate permissions
taskController.setup();
// Initialize DistributedCache
- this.distributedCacheManager = new TrackerDistributedCacheManager(
- this.fConf, taskController);
+ this.distributedCacheManager =
+ new TrackerDistributedCacheManager(this.fConf, taskController,
+ asyncDiskService);
this.jobClient = (InterTrackerProtocol)
RPC.waitForProxy(InterTrackerProtocol.class,
@@ -694,10 +696,14 @@
t, TaskTrackerInstrumentation.class);
}
- /**
- * Removes all contents of temporary storage. Called upon
+ /**
+ * Removes all contents of temporary storage. Called upon
* startup, to remove any leftovers from previous run.
+ *
+ * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead.
+ * @see org.apache.hadoop.mapreduce.util.MRAsyncDiskService#cleanupAllVolumes()
*/
+ @Deprecated
public void cleanupStorage() throws IOException {
this.fConf.deleteLocalFiles();
}
@@ -1092,9 +1098,23 @@
this.running = false;
- // Clear local storage
- cleanupStorage();
-
+ if (asyncDiskService != null) {
+ // Clear local storage
+ asyncDiskService.cleanupAllVolumes();
+
+ // Shutdown all async deletion threads with up to 10 seconds of delay
+ asyncDiskService.shutdown();
+ try {
+ if (!asyncDiskService.awaitTermination(10000)) {
+ asyncDiskService.shutdownNow();
+ asyncDiskService = null;
+ }
+ } catch (InterruptedException e) {
+ asyncDiskService.shutdownNow();
+ asyncDiskService = null;
+ }
+ }
+
// Shutdown the fetcher thread
this.mapEventsFetcher.interrupt();
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=898486&r1=898485&r2=898486&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
Tue Jan 12 19:49:41 2010
@@ -35,6 +35,7 @@
import org.apache.hadoop.mapred.TaskController.DistributedCacheFileContext;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
@@ -77,6 +78,8 @@
private Random random = new Random();
+ private MRAsyncDiskService asyncDiskService;
+
public TrackerDistributedCacheManager(Configuration conf,
TaskController taskController) throws IOException {
this.localFs = FileSystem.getLocal(conf);
@@ -86,6 +89,18 @@
}
/**
+ * Creates a TrackerDistributedCacheManager with a MRAsyncDiskService.
+ * @param asyncDiskService Provides a set of ThreadPools for async disk
+ * operations.
+ */
+ public TrackerDistributedCacheManager(Configuration conf,
+ TaskController taskController, MRAsyncDiskService asyncDiskService)
+ throws IOException {
+ this(conf, taskController);
+ this.asyncDiskService = asyncDiskService;
+ }
+
+ /**
* Get the locally cached file or archive; it could either be
* previously cached (and valid) or copy it from the {@link FileSystem} now.
*
@@ -252,8 +267,8 @@
// do the deletion, after releasing the global lock
for (CacheStatus lcacheStatus : deleteSet) {
synchronized (lcacheStatus) {
- FileSystem.getLocal(conf).delete(lcacheStatus.localizedLoadPath, true);
- LOG.info("Deleted path " + lcacheStatus.localizedLoadPath);
+ deleteLocalPath(asyncDiskService,
+ FileSystem.getLocal(conf), lcacheStatus.localizedLoadPath);
// decrement the size of the cache from baseDirSize
synchronized (baseDirSize) {
Long dirSize = baseDirSize.get(lcacheStatus.localizedBaseDir);
@@ -269,6 +284,30 @@
}
}
+ /**
+ * Delete a local path with asyncDiskService if available,
+ * or otherwise synchronously with local file system.
+ */
+ private static void deleteLocalPath(MRAsyncDiskService asyncDiskService,
+ LocalFileSystem fs, Path path) throws IOException {
+ boolean deleted = false;
+ if (asyncDiskService != null) {
+ // Try to delete using asyncDiskService
+ String localPathToDelete =
+ path.toUri().getPath();
+ deleted = asyncDiskService.moveAndDeleteAbsolutePath(localPathToDelete);
+ if (!deleted) {
+ LOG.warn("Cannot find DistributedCache path " + localPathToDelete
+ + " on any of the asyncDiskService volumes!");
+ }
+ }
+ if (!deleted) {
+ // If no asyncDiskService, we will delete the files synchronously
+ fs.delete(path, true);
+ }
+ LOG.info("Deleted path " + path);
+ }
+
/*
* Returns the relative path of the dir this cache will be localized in
* relative path that this cache will be localized in. For
@@ -620,7 +659,7 @@
synchronized (cachedArchives) {
for (Map.Entry<String,CacheStatus> f: cachedArchives.entrySet()) {
try {
- localFs.delete(f.getValue().localizedLoadPath, true);
+ deleteLocalPath(asyncDiskService, localFs, f.getValue().localizedLoadPath);
} catch (IOException ie) {
LOG.debug("Error cleaning up cache", ie);
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java?rev=898486&r1=898485&r2=898486&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java
Tue Jan 12 19:49:41 2010
@@ -22,31 +22,41 @@
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.AsyncDiskService;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
-/*
+/**
* This class is a container of multiple thread pools, each for a volume,
* so that we can schedule async disk operations easily.
*
* Examples of async disk operations are deletion of files.
- * We can move the files to a "TO_BE_DELETED" folder before asychronously
+ * We can move the files to a "toBeDeleted" folder before asychronously
* deleting it, to make sure the caller can run it faster.
*
+ * Users should not write files into the "toBeDeleted" folder, otherwise
+ * the files can be gone any time we restart the MRAsyncDiskService.
+ *
* This class also contains all operations that will be performed by the
* thread pools.
*/
+@InterfaceAudience.Private
public class MRAsyncDiskService {
public static final Log LOG = LogFactory.getLog(MRAsyncDiskService.class);
AsyncDiskService asyncDiskService;
+ public static final String TOBEDELETED = "toBeDeleted";
+
/**
* Create a AsyncDiskServices with a set of volumes (specified by their
* root directories).
@@ -57,21 +67,53 @@
* @param localFileSystem The localFileSystem used for deletions.
* @param volumes The roots of the file system volumes.
*/
- public MRAsyncDiskService(FileSystem localFileSystem, String[] volumes) throws IOException
{
-
- asyncDiskService = new AsyncDiskService(volumes);
+ public MRAsyncDiskService(FileSystem localFileSystem, String[] volumes)
+ throws IOException {
+ this.volumes = new String[volumes.length];
+ for (int v = 0; v < volumes.length; v++) {
+ this.volumes[v] = normalizePath(volumes[v]);
+ }
this.localFileSystem = localFileSystem;
- this.volumes = volumes;
+
+ asyncDiskService = new AsyncDiskService(this.volumes);
// Create one ThreadPool per volume
for (int v = 0 ; v < volumes.length; v++) {
// Create the root for file deletion
- if (!localFileSystem.mkdirs(new Path(volumes[v], SUBDIR))) {
- throw new IOException("Cannot create " + SUBDIR + " in " + volumes[v]);
+ Path absoluteSubdir = new Path(volumes[v], TOBEDELETED);
+ if (!localFileSystem.mkdirs(absoluteSubdir)) {
+ throw new IOException("Cannot create " + TOBEDELETED + " in " + volumes[v]);
}
}
+ // Create tasks to delete the paths inside the volumes
+ for (int v = 0 ; v < volumes.length; v++) {
+ Path absoluteSubdir = new Path(volumes[v], TOBEDELETED);
+ // List all files inside the volumes
+ FileStatus[] files = localFileSystem.listStatus(absoluteSubdir);
+ for (int f = 0; f < files.length; f++) {
+ // Get the relative file name to the root of the volume
+ String absoluteFilename = files[f].getPath().toUri().getPath();
+ String relative = getRelativePathName(absoluteFilename, volumes[v]);
+ if (relative == null) {
+ // This should never happen
+ throw new IOException("Cannot delete " + absoluteFilename
+ + " because it's outside of " + volumes[v]);
+ }
+ DeleteTask task = new DeleteTask(volumes[v], absoluteFilename,
+ relative);
+ execute(volumes[v], task);
+ }
+ }
+ }
+
+ /**
+ * Initialize MRAsyncDiskService based on conf.
+ * @param conf local file system and local dirs will be read from conf
+ */
+ public MRAsyncDiskService(JobConf conf) throws IOException {
+ this(FileSystem.getLocal(conf), conf.getLocalDirs());
}
/**
@@ -84,7 +126,7 @@
/**
* Gracefully start the shut down of all ThreadPools.
*/
- synchronized void shutdown() {
+ public synchronized void shutdown() {
asyncDiskService.shutdown();
}
@@ -99,7 +141,7 @@
* Wait for the termination of the thread pools.
*
* @param milliseconds The number of milliseconds to wait
- * @return true if all thread pools are terminated without time limit
+ * @return true if all thread pools are terminated within time limit
* @throws InterruptedException
*/
public synchronized boolean awaitTermination(long milliseconds)
@@ -107,15 +149,13 @@
return asyncDiskService.awaitTermination(milliseconds);
}
- public static final String SUBDIR = "toBeDeleted";
-
private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss.SSS");
private FileSystem localFileSystem;
private String[] volumes;
- private int uniqueId = 0;
+ private static AtomicLong uniqueId = new AtomicLong(0);
/** A task for deleting a pathName from a volume.
*/
@@ -133,7 +173,7 @@
* @param volume The volume that the file/dir is in.
* @param originalPath The original name, relative to volume root.
* @param pathToBeDeleted The name after the move, relative to volume root,
- * containing SUBDIR.
+ * containing TOBEDELETED.
*/
DeleteTask(String volume, String originalPath, String pathToBeDeleted) {
this.volume = volume;
@@ -161,7 +201,8 @@
if (!success) {
if (e != null) {
- LOG.warn("Failure in " + this + " with exception " + StringUtils.stringifyException(e));
+ LOG.warn("Failure in " + this + " with exception "
+ + StringUtils.stringifyException(e));
} else {
LOG.warn("Failure in " + this);
}
@@ -181,23 +222,34 @@
* won't see the path name under the old name anyway after the move.
*
* @param volume The disk volume
- * @param pathName The path name relative to volume.
+ * @param pathName The path name relative to volume root.
* @throws IOException If the move failed
+ * @return false if the file is not found
*/
- public boolean moveAndDelete(String volume, String pathName) throws IOException {
+ public boolean moveAndDeleteRelativePath(String volume, String pathName)
+ throws IOException {
+
+ volume = normalizePath(volume);
+
// Move the file right now, so that it can be deleted later
- String newPathName;
- synchronized (this) {
- newPathName = format.format(new Date()) + "_" + uniqueId;
- uniqueId ++;
- }
- newPathName = SUBDIR + Path.SEPARATOR_CHAR + newPathName;
+ String newPathName =
+ format.format(new Date()) + "_" + uniqueId.getAndIncrement();
+ newPathName = TOBEDELETED + Path.SEPARATOR_CHAR + newPathName;
Path source = new Path(volume, pathName);
Path target = new Path(volume, newPathName);
try {
if (!localFileSystem.rename(source, target)) {
- return false;
+ // Try to recreate the parent directory just in case it gets deleted.
+ if (!localFileSystem.mkdirs(new Path(volume, TOBEDELETED))) {
+ throw new IOException("Cannot create " + TOBEDELETED + " under "
+ + volume);
+ }
+ // Try rename again. If it fails, return false.
+ if (!localFileSystem.rename(source, target)) {
+ throw new IOException("Cannot rename " + source + " to "
+ + target);
+ }
}
} catch (FileNotFoundException e) {
// Return false in case that the file is not found.
@@ -217,13 +269,99 @@
* deletions are done. This is usually good enough because applications
* won't see the path name under the old name anyway after the move.
*
- * @param pathName The path name on each volume.
- * @throws IOException If the move failed
+ * @param pathName The path name relative to each volume root
+ * @throws IOException If any of the move failed
+ * @return false If any of the target pathName did not exist,
+ * note that the operation is still done on all volumes.
*/
- public void moveAndDeleteFromEachVolume(String pathName) throws IOException {
+ public boolean moveAndDeleteFromEachVolume(String pathName) throws IOException {
+ boolean result = true;
for (int i = 0; i < volumes.length; i++) {
- moveAndDelete(volumes[i], pathName);
+ result = result && moveAndDeleteRelativePath(volumes[i], pathName);
}
+ return result;
+ }
+
+ /**
+ * Move all files/directories inside volume into TOBEDELETED, and then
+ * delete them. The TOBEDELETED directory itself is ignored.
+ */
+ public void cleanupAllVolumes() throws IOException {
+ for (int v = 0; v < volumes.length; v++) {
+ // List all files inside the volumes
+ FileStatus[] files = localFileSystem.listStatus(new Path(volumes[v]));
+ for (int f = 0; f < files.length; f++) {
+ // Get the relative file name to the root of the volume
+ String absoluteFilename = files[f].getPath().toUri().getPath();
+ String relative = getRelativePathName(absoluteFilename, volumes[v]);
+ if (relative == null) {
+ // This should never happen
+ throw new IOException("Cannot delete " + absoluteFilename
+ + " because it's outside of " + volumes[v]);
+ }
+ // Do not delete the current TOBEDELETED
+ if (!TOBEDELETED.equals(relative)) {
+ moveAndDeleteRelativePath(volumes[v], relative);
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns the normalized path of a path.
+ */
+ private static String normalizePath(String path) {
+ return (new Path(path)).toUri().getPath();
+ }
+
+ /**
+ * Get the relative path name with respect to the root of the volume.
+ * @param absolutePathName The absolute path name
+ * @param volume Root of the volume.
+ * @return null if the absolute path name is outside of the volume.
+ */
+ private static String getRelativePathName(String absolutePathName,
+ String volume) {
+
+ absolutePathName = normalizePath(absolutePathName);
+ // Get the file names
+ if (!absolutePathName.startsWith(volume)) {
+ return null;
+ }
+ // Get rid of the volume prefix
+ String fileName = absolutePathName.substring(volume.length());
+ if (fileName.charAt(0) == Path.SEPARATOR_CHAR) {
+ fileName = fileName.substring(1);
+ }
+ return fileName;
+ }
+
+ /**
+ * Move the path name to a temporary location and then delete it.
+ *
+ * Note that if there is no volume that contains this path, the path
+ * will stay as it is, and the function will return false.
+ *
+ * This functions returns when the moves are done, but not necessarily all
+ * deletions are done. This is usually good enough because applications
+ * won't see the path name under the old name anyway after the move.
+ *
+ * @param absolutePathName The path name from root "/"
+ * @throws IOException If the move failed
+ * @return false if we are unable to move the path name
+ */
+ public boolean moveAndDeleteAbsolutePath(String absolutePathName)
+ throws IOException {
+
+ for (int v = 0; v < volumes.length; v++) {
+ String relative = getRelativePathName(absolutePathName, volumes[v]);
+ if (relative != null) {
+ return moveAndDeleteRelativePath(volumes[v], relative);
+ }
+ }
+
+ throw new IOException("Cannot delete " + absolutePathName
+ + " because it's outside of all volumes.");
}
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/util/TestMRAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/util/TestMRAsyncDiskService.java?rev=898486&r1=898485&r2=898486&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/util/TestMRAsyncDiskService.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/util/TestMRAsyncDiskService.java
Tue Jan 12 19:49:41 2010
@@ -18,11 +18,14 @@
package org.apache.hadoop.mapreduce.util;
import java.io.File;
+import java.io.IOException;
+
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
+import org.junit.Test;
/**
* A test for MRAsyncDiskService.
@@ -33,10 +36,10 @@
"test.build.data", "/tmp")).toString();
/**
- * This test creates one empty directory, and one directory with content,
- * and then removes them through MRAsyncDiskService.
- * @throws Throwable
+ * This test creates some directories and then removes them through
+ * MRAsyncDiskService.
*/
+ @Test
public void testMRAsyncDiskService() throws Throwable {
FileSystem localFileSystem = FileSystem.getLocal(new Configuration());
@@ -48,27 +51,139 @@
String a = "a";
String b = "b";
String c = "b/c";
+ String d = "d";
File fa = new File(vols[0], a);
File fb = new File(vols[1], b);
File fc = new File(vols[1], c);
+ File fd = new File(vols[1], d);
// Create the directories
fa.mkdirs();
fb.mkdirs();
fc.mkdirs();
+ fd.mkdirs();
assertTrue(fa.exists());
assertTrue(fb.exists());
assertTrue(fc.exists());
+ assertTrue(fd.exists());
// Move and delete them
- service.moveAndDelete(vols[0], a);
+ service.moveAndDeleteRelativePath(vols[0], a);
assertFalse(fa.exists());
- service.moveAndDelete(vols[1], b);
+ service.moveAndDeleteRelativePath(vols[1], b);
assertFalse(fb.exists());
assertFalse(fc.exists());
+ // asyncDiskService is NOT able to delete files outside all volumes.
+ IOException ee = null;
+ try {
+ service.moveAndDeleteAbsolutePath(TEST_ROOT_DIR + "/2");
+ } catch (IOException e) {
+ ee = e;
+ }
+ assertNotNull("asyncDiskService should not be able to delete files "
+ + "outside all volumes", ee);
+ // asyncDiskService is able to automatically find the file in one
+ // of the volumes.
+ assertTrue(service.moveAndDeleteAbsolutePath(vols[1] + Path.SEPARATOR_CHAR + d));
+
+ // Make sure everything is cleaned up
+ makeSureCleanedUp(vols, service);
+ }
+
+ /**
+ * This test creates some directories inside the volume roots, and then
+ * call asyncDiskService.MoveAndDeleteAllVolumes.
+ * We should be able to delete all files/dirs inside the volumes except
+ * the toBeDeleted directory.
+ */
+ @Test
+ public void testMRAsyncDiskServiceMoveAndDeleteAllVolumes() throws Throwable {
+ FileSystem localFileSystem = FileSystem.getLocal(new Configuration());
+ String[] vols = new String[]{TEST_ROOT_DIR + "/0",
+ TEST_ROOT_DIR + "/1"};
+ MRAsyncDiskService service = new MRAsyncDiskService(
+ localFileSystem, vols);
+
+ String a = "a";
+ String b = "b";
+ String c = "b/c";
+ String d = "d";
+
+ File fa = new File(vols[0], a);
+ File fb = new File(vols[1], b);
+ File fc = new File(vols[1], c);
+ File fd = new File(vols[1], d);
+
+ // Create the directories
+ fa.mkdirs();
+ fb.mkdirs();
+ fc.mkdirs();
+ fd.mkdirs();
+
+ assertTrue(fa.exists());
+ assertTrue(fb.exists());
+ assertTrue(fc.exists());
+ assertTrue(fd.exists());
+
+ // Delete all of them
+ service.cleanupAllVolumes();
+
+ assertFalse(fa.exists());
+ assertFalse(fb.exists());
+ assertFalse(fc.exists());
+ assertFalse(fd.exists());
+
+ // Make sure everything is cleaned up
+ makeSureCleanedUp(vols, service);
+ }
+
+ /**
+ * This test creates some directories inside the toBeDeleted directory and
+ * then start the asyncDiskService.
+ * AsyncDiskService will create tasks to delete the content inside the
+ * toBeDeleted directories.
+ */
+ @Test
+ public void testMRAsyncDiskServiceStartupCleaning() throws Throwable {
+ FileSystem localFileSystem = FileSystem.getLocal(new Configuration());
+ String[] vols = new String[]{TEST_ROOT_DIR + "/0",
+ TEST_ROOT_DIR + "/1"};
+
+ String a = "a";
+ String b = "b";
+ String c = "b/c";
+ String d = "d";
+
+ // Create directories inside SUBDIR
+ File fa = new File(vols[0] + Path.SEPARATOR_CHAR + MRAsyncDiskService.TOBEDELETED, a);
+ File fb = new File(vols[1] + Path.SEPARATOR_CHAR + MRAsyncDiskService.TOBEDELETED, b);
+ File fc = new File(vols[1] + Path.SEPARATOR_CHAR + MRAsyncDiskService.TOBEDELETED, c);
+ File fd = new File(vols[1] + Path.SEPARATOR_CHAR + MRAsyncDiskService.TOBEDELETED, d);
+
+ // Create the directories
+ fa.mkdirs();
+ fb.mkdirs();
+ fc.mkdirs();
+ fd.mkdirs();
+
+ assertTrue(fa.exists());
+ assertTrue(fb.exists());
+ assertTrue(fc.exists());
+ assertTrue(fd.exists());
+
+ // Create the asyncDiskService which will delete all contents inside SUBDIR
+ MRAsyncDiskService service = new MRAsyncDiskService(
+ localFileSystem, vols);
+
+ // Make sure everything is cleaned up
+ makeSureCleanedUp(vols, service);
+ }
+
+ private void makeSureCleanedUp(String[] vols, MRAsyncDiskService service)
+ throws Throwable {
// Sleep at most 5 seconds to make sure the deleted items are all gone.
service.shutdown();
if (!service.awaitTermination(5000)) {
@@ -76,12 +191,18 @@
}
// All contents should be gone by now.
- for (int i = 0; i < 2; i++) {
- File toBeDeletedDir = new File(vols[0], MRAsyncDiskService.SUBDIR);
+ for (int i = 0; i < vols.length; i++) {
+ File subDir = new File(vols[0]);
+ String[] subDirContent = subDir.list();
+ assertEquals("Volume should contain a single child: "
+ + MRAsyncDiskService.TOBEDELETED, 1, subDirContent.length);
+
+ File toBeDeletedDir = new File(vols[0], MRAsyncDiskService.TOBEDELETED);
String[] content = toBeDeletedDir.list();
assertNotNull("Cannot find " + toBeDeletedDir, content);
- assertEquals("" + toBeDeletedDir + " should be empty now.",
- 0, content.length);
+ assertEquals("" + toBeDeletedDir + " should be empty now.", 0,
+ content.length);
}
}
+
}
|