hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1149760 - in /hadoop/common/trunk/common: ./ src/java/org/apache/hadoop/fs/ src/test/core/org/apache/hadoop/fs/
Date Fri, 22 Jul 2011 23:23:33 GMT
Author: suresh
Date: Fri Jul 22 23:23:32 2011
New Revision: 1149760

URL: http://svn.apache.org/viewvc?rev=1149760&view=rev
Log:
HADOOP-7460. Support pluggable trash policies. Contributed by Usman Masoon.

Added:
    hadoop/common/trunk/common/src/java/org/apache/hadoop/fs/TrashPolicy.java
    hadoop/common/trunk/common/src/java/org/apache/hadoop/fs/TrashPolicyDefault.java
Modified:
    hadoop/common/trunk/common/CHANGES.txt
    hadoop/common/trunk/common/src/java/org/apache/hadoop/fs/Trash.java
    hadoop/common/trunk/common/src/test/core/org/apache/hadoop/fs/TestTrash.java

Modified: hadoop/common/trunk/common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/CHANGES.txt?rev=1149760&r1=1149759&r2=1149760&view=diff
==============================================================================
--- hadoop/common/trunk/common/CHANGES.txt (original)
+++ hadoop/common/trunk/common/CHANGES.txt Fri Jul 22 23:23:32 2011
@@ -58,6 +58,8 @@ Trunk (unreleased changes)
     HADOOP-7380. Add client failover functionality to o.a.h.io.(ipc|retry).
     (atm via eli)
 
+    HADOOP-7460. Support pluggable trash policies. (Usman Masoon via suresh)
+
   IMPROVEMENTS
 
     HADOOP-7042. Updates to test-patch.sh to include failed test names and

Modified: hadoop/common/trunk/common/src/java/org/apache/hadoop/fs/Trash.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/java/org/apache/hadoop/fs/Trash.java?rev=1149760&r1=1149759&r2=1149760&view=diff
==============================================================================
--- hadoop/common/trunk/common/src/java/org/apache/hadoop/fs/Trash.java (original)
+++ hadoop/common/trunk/common/src/java/org/apache/hadoop/fs/Trash.java Fri Jul 22 23:23:32
2011
@@ -17,60 +17,26 @@
  */
 package org.apache.hadoop.fs;
 
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_KEY;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
-
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.util.StringUtils;
-
-/** Provides a <i>trash</i> feature.  Files are moved to a user's trash
- * directory, a subdirectory of their home directory named ".Trash".  Files are
- * initially moved to a <i>current</i> sub-directory of the trash directory.
- * Within that sub-directory their original path is preserved.  Periodically
- * one may checkpoint the current trash and remove older checkpoints.  (This
- * design permits trash management without enumeration of the full trash
- * content, without date support in the filesystem, and without clock
- * synchronization.)
+
+/** 
+ * Provides a trash facility which supports pluggable Trash policies. 
+ *
+ * See the implementation of the configured TrashPolicy for more
+ * details.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public class Trash extends Configured {
-  private static final Log LOG =
-    LogFactory.getLog(Trash.class);
-
-  private static final Path CURRENT = new Path("Current");
-  private static final Path TRASH = new Path(".Trash/");
-  
-
-  private static final FsPermission PERMISSION =
-    new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
-
-  private static final DateFormat CHECKPOINT = new SimpleDateFormat("yyMMddHHmmss");
-  private static final int MSECS_PER_MINUTE = 60*1000;
+  private TrashPolicy trashPolicy; // configured trash policy instance
 
-  private final FileSystem fs;
-  private final Path trash;
-  private final Path current;
-  private final long deletionInterval;
-  private final Path homesParent;
-
-  /** Construct a trash can accessor.
+  /** 
+   * Construct a trash can accessor.
    * @param conf a Configuration
    */
   public Trash(Configuration conf) throws IOException {
@@ -79,22 +45,18 @@ public class Trash extends Configured {
 
   /**
    * Construct a trash can accessor for the FileSystem provided.
+   * @param fs the FileSystem
+   * @param conf a Configuration
    */
   public Trash(FileSystem fs, Configuration conf) throws IOException {
     super(conf);
-    this.fs = fs;
-    this.trash = new Path(fs.getHomeDirectory(), TRASH);
-    this.homesParent = fs.getHomeDirectory().getParent();
-    this.current = new Path(trash, CURRENT);
-    this.deletionInterval = (long) (conf.getFloat(FS_TRASH_INTERVAL_KEY,
-                                         FS_TRASH_INTERVAL_DEFAULT) *
-                                MSECS_PER_MINUTE);
+    trashPolicy = TrashPolicy.getInstance(conf, fs, fs.getHomeDirectory());
   }
-  
+
   /**
    * In case of the symlinks or mount points, one has to move the appropriate
    * trashbin in the actual volume of the path p being deleted.
-   * 
+   *
    * Hence we get the file system of the fully-qualified resolved-path and
    * then move the path p to the trashbin in that volume,
    * @param fs - the filesystem of path p
@@ -115,240 +77,49 @@ public class Trash extends Configured {
     return success;
   }
   
-  private Trash(Path home, Configuration conf) throws IOException {
-    super(conf);
-    this.fs = home.getFileSystem(conf);
-    this.trash = new Path(home, TRASH);
-    this.homesParent = home.getParent();
-    this.current = new Path(trash, CURRENT);
-    this.deletionInterval = (long) (conf.getFloat(FS_TRASH_INTERVAL_KEY,
-                                         FS_TRASH_INTERVAL_DEFAULT) *
-                                MSECS_PER_MINUTE);
-  }
-  
-  private Path makeTrashRelativePath(Path basePath, Path rmFilePath) {
-    return new Path(basePath + rmFilePath.toUri().getPath());
-  }
-
   /**
    * Returns whether the trash is enabled for this filesystem
    */
   public boolean isEnabled() {
-    return (deletionInterval != 0);
+    return trashPolicy.isEnabled();
   }
 
   /** Move a file or directory to the current trash directory.
    * @return false if the item is already in the trash or trash is disabled
    */ 
   public boolean moveToTrash(Path path) throws IOException {
-    if (!isEnabled())
-      return false;
-
-    if (!path.isAbsolute())                       // make path absolute
-      path = new Path(fs.getWorkingDirectory(), path);
-
-    if (!fs.exists(path))                         // check that path exists
-      throw new FileNotFoundException(path.toString());
-
-    String qpath = fs.makeQualified(path).toString();
-
-    if (qpath.startsWith(trash.toString())) {
-      return false;                               // already in trash
-    }
-
-    if (trash.getParent().toString().startsWith(qpath)) {
-      throw new IOException("Cannot move \"" + path +
-                            "\" to the trash, as it contains the trash");
-    }
-
-    Path trashPath = makeTrashRelativePath(current, path);
-    Path baseTrashPath = makeTrashRelativePath(current, path.getParent());
-    
-    IOException cause = null;
-
-    // try twice, in case checkpoint between the mkdirs() & rename()
-    for (int i = 0; i < 2; i++) {
-      try {
-        if (!fs.mkdirs(baseTrashPath, PERMISSION)) {      // create current
-          LOG.warn("Can't create(mkdir) trash directory: "+baseTrashPath);
-          return false;
-        }
-      } catch (IOException e) {
-        LOG.warn("Can't create trash directory: "+baseTrashPath);
-        cause = e;
-        break;
-      }
-      try {
-        //
-        // if the target path in Trash already exists, then append with 
-        // a current time in millisecs.
-        //
-        String orig = trashPath.toString();
-        
-        while(fs.exists(trashPath)) {
-          trashPath = new Path(orig + System.currentTimeMillis());
-        }
-        
-        if (fs.rename(path, trashPath))           // move to current trash
-          return true;
-      } catch (IOException e) {
-        cause = e;
-      }
-    }
-    throw (IOException)
-      new IOException("Failed to move to trash: "+path).initCause(cause);
+    return trashPolicy.moveToTrash(path);
   }
 
   /** Create a trash checkpoint. */
   public void checkpoint() throws IOException {
-    if (!fs.exists(current))                      // no trash, no checkpoint
-      return;
-
-    Path checkpoint;
-    synchronized (CHECKPOINT) {
-      checkpoint = new Path(trash, CHECKPOINT.format(new Date()));
-    }
-
-    if (fs.rename(current, checkpoint)) {
-      LOG.info("Created trash checkpoint: "+checkpoint.toUri().getPath());
-    } else {
-      throw new IOException("Failed to checkpoint trash: "+checkpoint);
-    }
+    trashPolicy.createCheckpoint();
   }
 
-  /** Delete old checkpoints. */
+  /** Delete old checkpoint(s). */
   public void expunge() throws IOException {
-    FileStatus[] dirs = null;
-    
-    try {
-      dirs = fs.listStatus(trash);            // scan trash sub-directories
-    } catch (FileNotFoundException fnfe) {
-      return;
-    }
-
-    long now = System.currentTimeMillis();
-    for (int i = 0; i < dirs.length; i++) {
-      Path path = dirs[i].getPath();
-      String dir = path.toUri().getPath();
-      String name = path.getName();
-      if (name.equals(CURRENT.getName()))         // skip current
-        continue;
-
-      long time;
-      try {
-        synchronized (CHECKPOINT) {
-          time = CHECKPOINT.parse(name).getTime();
-        }
-      } catch (ParseException e) {
-        LOG.warn("Unexpected item in trash: "+dir+". Ignoring.");
-        continue;
-      }
-
-      if ((now - deletionInterval) > time) {
-        if (fs.delete(path, true)) {
-          LOG.info("Deleted trash checkpoint: "+dir);
-        } else {
-          LOG.warn("Couldn't delete checkpoint: "+dir+" Ignoring.");
-        }
-      }
-    }
+    trashPolicy.deleteCheckpoint();
   }
 
-  //
-  // get the current working directory
-  //
+  /** get the current working directory */
   Path getCurrentTrashDir() {
-    return current;
+    return trashPolicy.getCurrentTrashDir();
+  }
+
+  /** get the configured trash policy */
+  TrashPolicy getTrashPolicy() {
+    return trashPolicy;
   }
 
   /** Return a {@link Runnable} that periodically empties the trash of all
-   * users, intended to be run by the superuser.  Only one checkpoint is kept
-   * at a time.
+   * users, intended to be run by the superuser.
    */
   public Runnable getEmptier() throws IOException {
-    return new Emptier(getConf());
-  }
-
-  private class Emptier implements Runnable {
-
-    private Configuration conf;
-    private long emptierInterval;
-
-    Emptier(Configuration conf) throws IOException {
-      this.conf = conf;
-      this.emptierInterval = (long) (conf.getFloat(FS_TRASH_CHECKPOINT_INTERVAL_KEY,
-                                     FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT) *
-                                     MSECS_PER_MINUTE);
-      if (this.emptierInterval > deletionInterval ||
-          this.emptierInterval == 0) {
-        LOG.warn("The configured interval for checkpoint is " +
-                 this.emptierInterval + " minutes." +
-                 " Using interval of " + deletionInterval +
-                 " minutes that is used for deletion instead");
-        this.emptierInterval = deletionInterval;
-      }
-    }
-
-    public void run() {
-      if (emptierInterval == 0)
-        return;                                   // trash disabled
-      long now = System.currentTimeMillis();
-      long end;
-      while (true) {
-        end = ceiling(now, emptierInterval);
-        try {                                     // sleep for interval
-          Thread.sleep(end - now);
-        } catch (InterruptedException e) {
-          break;                                  // exit on interrupt
-        }
-
-        try {
-          now = System.currentTimeMillis();
-          if (now >= end) {
-
-            FileStatus[] homes = null;
-            try {
-              homes = fs.listStatus(homesParent);         // list all home dirs
-            } catch (IOException e) {
-              LOG.warn("Trash can't list homes: "+e+" Sleeping.");
-              continue;
-            }
-
-            for (FileStatus home : homes) {         // dump each trash
-              if (!home.isDirectory())
-                continue;
-              try {
-                Trash trash = new Trash(home.getPath(), conf);
-                trash.expunge();
-                trash.checkpoint();
-              } catch (IOException e) {
-                LOG.warn("Trash caught: "+e+". Skipping "+home.getPath()+".");
-              } 
-            }
-          }
-        } catch (Exception e) {
-          LOG.warn("RuntimeException during Trash.Emptier.run(): ", e); 
-        }
-      }
-      try {
-        fs.close();
-      } catch(IOException e) {
-        LOG.warn("Trash cannot close FileSystem: ", e);
-      }
-    }
-
-    private long ceiling(long time, long interval) {
-      return floor(time, interval) + interval;
-    }
-    private long floor(long time, long interval) {
-      return (time / interval) * interval;
-    }
-
+    return trashPolicy.getEmptier();
   }
 
   /** Run an emptier.*/
   public static void main(String[] args) throws Exception {
     new Trash(new Configuration()).getEmptier().run();
   }
-
 }

Added: hadoop/common/trunk/common/src/java/org/apache/hadoop/fs/TrashPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/java/org/apache/hadoop/fs/TrashPolicy.java?rev=1149760&view=auto
==============================================================================
--- hadoop/common/trunk/common/src/java/org/apache/hadoop/fs/TrashPolicy.java (added)
+++ hadoop/common/trunk/common/src/java/org/apache/hadoop/fs/TrashPolicy.java Fri Jul 22 23:23:32
2011
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.io.IOException;
+
+/** 
+ * This interface is used for implementing different Trash policies.
+ * Provides factory method to create instances of the configured Trash policy.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class TrashPolicy extends Configured {
+  protected FileSystem fs; // the FileSystem
+  protected Path trash; // path to trash directory
+  protected long deletionInterval; // deletion interval for Emptier
+
+  /**
+   * Used to setup the trash policy. Must be implemented by all TrashPolicy
+   * implementations
+   * @param conf the configuration to be used
+   * @param fs the filesystem to be used
+   * @param home the home directory
+   */
+  public abstract void initialize(Configuration conf, FileSystem fs, Path home);
+
+  /**
+   * Returns whether the Trash Policy is enabled for this filesystem
+   */
+  public abstract boolean isEnabled();
+
+  /** 
+   * Move a file or directory to the current trash directory.
+   * @return false if the item is already in the trash or trash is disabled
+   */ 
+  public abstract boolean moveToTrash(Path path) throws IOException;
+
+  /** 
+   * Create a trash checkpoint. 
+   */
+  public abstract void createCheckpoint() throws IOException;
+
+  /** 
+   * Delete old trash checkpoint(s).
+   */
+  public abstract void deleteCheckpoint() throws IOException;
+
+  /**
+   * Get the current working directory of the Trash Policy
+   */
+  public abstract Path getCurrentTrashDir();
+
+  /** 
+   * Return a {@link Runnable} that periodically empties the trash of all
+   * users, intended to be run by the superuser.
+   */
+  public abstract Runnable getEmptier() throws IOException;
+
+  /**
+   * Get an instance of the configured TrashPolicy based on the value 
+   * of the configuration paramater fs.trash.classname.
+   *
+   * @param conf the configuration to be used
+   * @param fs the file system to be used
+   * @param home the home directory
+   * @return an instance of TrashPolicy
+   */
+  public static TrashPolicy getInstance(Configuration conf, FileSystem fs, Path home)
+      throws IOException {
+    Class<? extends TrashPolicy> trashClass = conf.getClass("fs.trash.classname",
+                                                      TrashPolicyDefault.class,
+                                                      TrashPolicy.class);
+    TrashPolicy trash = (TrashPolicy) ReflectionUtils.newInstance(trashClass, conf);
+    trash.initialize(conf, fs, home); // initialize TrashPolicy
+    return trash;
+  }
+}

Added: hadoop/common/trunk/common/src/java/org/apache/hadoop/fs/TrashPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/java/org/apache/hadoop/fs/TrashPolicyDefault.java?rev=1149760&view=auto
==============================================================================
--- hadoop/common/trunk/common/src/java/org/apache/hadoop/fs/TrashPolicyDefault.java (added)
+++ hadoop/common/trunk/common/src/java/org/apache/hadoop/fs/TrashPolicyDefault.java Fri Jul
22 23:23:32 2011
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+/** Provides a <i>trash</i> feature.  Files are moved to a user's trash
+ * directory, a subdirectory of their home directory named ".Trash".  Files are
+ * initially moved to a <i>current</i> sub-directory of the trash directory.
+ * Within that sub-directory their original path is preserved.  Periodically
+ * one may checkpoint the current trash and remove older checkpoints.  (This
+ * design permits trash management without enumeration of the full trash
+ * content, without date support in the filesystem, and without clock
+ * synchronization.)
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class TrashPolicyDefault extends TrashPolicy {
+  private static final Log LOG =
+    LogFactory.getLog(TrashPolicyDefault.class);
+
+  private static final Path CURRENT = new Path("Current");
+  private static final Path TRASH = new Path(".Trash/");  
+
+  private static final FsPermission PERMISSION =
+    new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
+
+  private static final DateFormat CHECKPOINT = new SimpleDateFormat("yyMMddHHmmss");
+  private static final int MSECS_PER_MINUTE = 60*1000;
+
+  private Path current;
+  private Path homesParent;
+
+  public TrashPolicyDefault() { }
+
+  private TrashPolicyDefault(Path home, Configuration conf) throws IOException {
+    initialize(conf, home.getFileSystem(conf), home);
+  }
+
+  @Override
+  public void initialize(Configuration conf, FileSystem fs, Path home) {
+    this.fs = fs;
+    this.trash = new Path(home, TRASH);
+    this.homesParent = home.getParent();
+    this.current = new Path(trash, CURRENT);
+    this.deletionInterval = (long) (conf.getFloat(FS_TRASH_INTERVAL_KEY,
+                                    FS_TRASH_INTERVAL_DEFAULT) *  MSECS_PER_MINUTE);
+  }
+  
+  private Path makeTrashRelativePath(Path basePath, Path rmFilePath) {
+    return new Path(basePath + rmFilePath.toUri().getPath());
+  }
+
+  @Override
+  public boolean isEnabled() {
+    return (deletionInterval != 0);
+  }
+
+  @Override
+  public boolean moveToTrash(Path path) throws IOException {
+    if (!isEnabled())
+      return false;
+
+    if (!path.isAbsolute())                       // make path absolute
+      path = new Path(fs.getWorkingDirectory(), path);
+
+    if (!fs.exists(path))                         // check that path exists
+      throw new FileNotFoundException(path.toString());
+
+    String qpath = fs.makeQualified(path).toString();
+
+    if (qpath.startsWith(trash.toString())) {
+      return false;                               // already in trash
+    }
+
+    if (trash.getParent().toString().startsWith(qpath)) {
+      throw new IOException("Cannot move \"" + path +
+                            "\" to the trash, as it contains the trash");
+    }
+
+    Path trashPath = makeTrashRelativePath(current, path);
+    Path baseTrashPath = makeTrashRelativePath(current, path.getParent());
+    
+    IOException cause = null;
+
+    // try twice, in case checkpoint between the mkdirs() & rename()
+    for (int i = 0; i < 2; i++) {
+      try {
+        if (!fs.mkdirs(baseTrashPath, PERMISSION)) {      // create current
+          LOG.warn("Can't create(mkdir) trash directory: "+baseTrashPath);
+          return false;
+        }
+      } catch (IOException e) {
+        LOG.warn("Can't create trash directory: "+baseTrashPath);
+        cause = e;
+        break;
+      }
+      try {
+        // if the target path in Trash already exists, then append with 
+        // a current time in millisecs.
+        String orig = trashPath.toString();
+        
+        while(fs.exists(trashPath)) {
+          trashPath = new Path(orig + System.currentTimeMillis());
+        }
+        
+        if (fs.rename(path, trashPath))           // move to current trash
+          return true;
+      } catch (IOException e) {
+        cause = e;
+      }
+    }
+    throw (IOException)
+      new IOException("Failed to move to trash: "+path).initCause(cause);
+  }
+
+  @Override
+  public void createCheckpoint() throws IOException {
+    if (!fs.exists(current))                     // no trash, no checkpoint
+      return;
+
+    Path checkpoint;
+    synchronized (CHECKPOINT) {
+      checkpoint = new Path(trash, CHECKPOINT.format(new Date()));
+    }
+
+    if (fs.rename(current, checkpoint)) {
+      LOG.info("Created trash checkpoint: "+checkpoint.toUri().getPath());
+    } else {
+      throw new IOException("Failed to checkpoint trash: "+checkpoint);
+    }
+  }
+
+  @Override
+  public void deleteCheckpoint() throws IOException {
+    FileStatus[] dirs = null;
+    
+    try {
+      dirs = fs.listStatus(trash);            // scan trash sub-directories
+    } catch (FileNotFoundException fnfe) {
+      return;
+    }
+
+    long now = System.currentTimeMillis();
+    for (int i = 0; i < dirs.length; i++) {
+      Path path = dirs[i].getPath();
+      String dir = path.toUri().getPath();
+      String name = path.getName();
+      if (name.equals(CURRENT.getName()))         // skip current
+        continue;
+
+      long time;
+      try {
+        synchronized (CHECKPOINT) {
+          time = CHECKPOINT.parse(name).getTime();
+        }
+      } catch (ParseException e) {
+        LOG.warn("Unexpected item in trash: "+dir+". Ignoring.");
+        continue;
+      }
+
+      if ((now - deletionInterval) > time) {
+        if (fs.delete(path, true)) {
+          LOG.info("Deleted trash checkpoint: "+dir);
+        } else {
+          LOG.warn("Couldn't delete checkpoint: "+dir+" Ignoring.");
+        }
+      }
+    }
+  }
+
+  @Override
+  public Path getCurrentTrashDir() {
+    return current;
+  }
+
+  @Override
+  public Runnable getEmptier() throws IOException {
+    return new Emptier(getConf());
+  }
+
+  private class Emptier implements Runnable {
+
+    private Configuration conf;
+    private long emptierInterval;
+
+    Emptier(Configuration conf) throws IOException {
+      this.conf = conf;
+      this.emptierInterval = (long) (conf.getFloat(FS_TRASH_CHECKPOINT_INTERVAL_KEY,
+                                     FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT) *
+                                     MSECS_PER_MINUTE);
+      if (this.emptierInterval > deletionInterval ||
+          this.emptierInterval == 0) {
+        LOG.warn("The configured interval for checkpoint is " +
+                 this.emptierInterval + " minutes." +
+                 " Using interval of " + deletionInterval +
+                 " minutes that is used for deletion instead");
+        this.emptierInterval = deletionInterval;
+      }
+    }
+
+    public void run() {
+      if (emptierInterval == 0)
+        return;                                   // trash disabled
+      long now = System.currentTimeMillis();
+      long end;
+      while (true) {
+        end = ceiling(now, emptierInterval);
+        try {                                     // sleep for interval
+          Thread.sleep(end - now);
+        } catch (InterruptedException e) {
+          break;                                  // exit on interrupt
+        }
+
+        try {
+          now = System.currentTimeMillis();
+          if (now >= end) {
+
+            FileStatus[] homes = null;
+            try {
+              homes = fs.listStatus(homesParent);         // list all home dirs
+            } catch (IOException e) {
+              LOG.warn("Trash can't list homes: "+e+" Sleeping.");
+              continue;
+            }
+
+            for (FileStatus home : homes) {         // dump each trash
+              if (!home.isDirectory())
+                continue;
+              try {
+                TrashPolicyDefault trash = new TrashPolicyDefault(home.getPath(), conf);
+                trash.deleteCheckpoint();
+                trash.createCheckpoint();
+              } catch (IOException e) {
+                LOG.warn("Trash caught: "+e+". Skipping "+home.getPath()+".");
+              } 
+            }
+          }
+        } catch (Exception e) {
+          LOG.warn("RuntimeException during Trash.Emptier.run(): ", e); 
+        }
+      }
+      try {
+        fs.close();
+      } catch(IOException e) {
+        LOG.warn("Trash cannot close FileSystem: ", e);
+      }
+    }
+
+    private long ceiling(long time, long interval) {
+      return floor(time, interval) + interval;
+    }
+    private long floor(long time, long interval) {
+      return (time / interval) * interval;
+    }
+  }
+}

Modified: hadoop/common/trunk/common/src/test/core/org/apache/hadoop/fs/TestTrash.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/test/core/org/apache/hadoop/fs/TestTrash.java?rev=1149760&r1=1149759&r2=1149760&view=diff
==============================================================================
--- hadoop/common/trunk/common/src/test/core/org/apache/hadoop/fs/TestTrash.java (original)
+++ hadoop/common/trunk/common/src/test/core/org/apache/hadoop/fs/TestTrash.java Fri Jul 22
23:23:32 2011
@@ -480,6 +480,15 @@ public class TestTrash extends TestCase 
     trashNonDefaultFS(conf);
   }
   
+  public void testPluggableTrash() throws IOException {
+    Configuration conf = new Configuration();
+
+    // Test plugged TrashPolicy
+    conf.setClass("fs.trash.classname", TestTrashPolicy.class, TrashPolicy.class);
+    Trash trash = new Trash(conf);
+    assertTrue(trash.getTrashPolicy().getClass().equals(TestTrashPolicy.class));
+  }
+
   public void testTrashEmptier() throws Exception {
     Configuration conf = new Configuration();
     // Trash with 12 second deletes and 6 seconds checkpoints
@@ -638,4 +647,41 @@ public class TestTrash extends TestCase 
     // run performance piece as a separate test
     performanceTestDeleteSameFile();
   }
+
+  // Test TrashPolicy. Don't care about implementation.
+  public static class TestTrashPolicy extends TrashPolicy {
+    public TestTrashPolicy() { }
+
+    @Override
+    public void initialize(Configuration conf, FileSystem fs, Path home) {
+    }
+
+    @Override
+    public boolean isEnabled() {
+      return false;
+    }
+
+    @Override 
+    public boolean moveToTrash(Path path) throws IOException {
+      return false;
+    }
+
+    @Override
+    public void createCheckpoint() throws IOException {
+    }
+
+    @Override
+    public void deleteCheckpoint() throws IOException {
+    }
+
+    @Override
+    public Path getCurrentTrashDir() {
+      return null;
+    }
+
+    @Override
+    public Runnable getEmptier() throws IOException {
+      return null;
+    }
+  }
 }



Mime
View raw message