hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject hbase git commit: HBASE-18309 Support multi threads in CleanerChore
Date Fri, 24 Nov 2017 05:37:05 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-2 66e650c39 -> 2838cf3e0


HBASE-18309 Support multi threads in CleanerChore

Signed-off-by: Yu Li <liyu@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2838cf3e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2838cf3e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2838cf3e

Branch: refs/heads/branch-2
Commit: 2838cf3e0506d9be8c001ebbadc0c3acf68d762c
Parents: 66e650c
Author: Reid Chan <reidddchan@outlook.com>
Authored: Thu Nov 23 11:21:12 2017 +0800
Committer: Yu Li <liyu@apache.org>
Committed: Fri Nov 24 13:36:55 2017 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/master/HMaster.java |   1 +
 .../hbase/master/cleaner/CleanerChore.java      | 335 +++++++++++++------
 .../hbase/master/cleaner/HFileCleaner.java      |   8 +-
 .../hadoop/hbase/master/cleaner/LogCleaner.java | 168 +++++++++-
 .../hbase/master/cleaner/TestCleanerChore.java  |  61 +++-
 .../hbase/master/cleaner/TestLogsCleaner.java   |  51 +++
 6 files changed, 514 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/2838cf3e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index fcaf55f..cde3581 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -908,6 +908,7 @@ public class HMaster extends HRegionServer implements MasterServices {
     this.masterFinishedInitializationTime = System.currentTimeMillis();
     configurationManager.registerObserver(this.balancer);
     configurationManager.registerObserver(this.hfileCleaner);
+    configurationManager.registerObserver(this.logCleaner);
 
     // Set master as 'initialized'.
     setInitialized(true);

http://git-wip-us.apache.org/repos/asf/hbase/blob/2838cf3e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
index b8ca1ec..582df84 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
@@ -17,18 +17,22 @@
  */
 package org.apache.hadoop.hbase.master.cleaner;
 
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Predicate;
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterables;
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.RecursiveTask;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
@@ -46,16 +50,33 @@ import org.apache.hadoop.ipc.RemoteException;
  * Abstract Cleaner that uses a chain of delegates to clean a directory of files
  * @param <T> Cleaner delegate class that is dynamically loaded from configuration
  */
-public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore
{
+public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore
+    implements ConfigurationObserver {
 
   private static final Log LOG = LogFactory.getLog(CleanerChore.class.getName());
+  private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors();
+
+  /**
+   * If it is an integer and >= 1, it would be the size;
+   * if 0.0 < size <= 1.0, size would be available processors * size.
+   * Pay attention that 1.0 is different from 1, former indicates it will use 100% of cores,
+   * while latter will use only 1 thread for chore to scan dir.
+   */
+  public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size";
+  private static final String DEFAULT_CHORE_POOL_SIZE = "0.5";
+
+  // It may be waste resources for each cleaner chore own its pool,
+  // so let's make pool for all cleaner chores.
+  private static volatile ForkJoinPool chorePool;
+  private static volatile int chorePoolSize;
 
   protected final FileSystem fs;
   private final Path oldFileDir;
   private final Configuration conf;
+  protected final Map<String, Object> params;
+  private final AtomicBoolean enabled = new AtomicBoolean(true);
+  private final AtomicBoolean reconfig = new AtomicBoolean(false);
   protected List<T> cleanersChain;
-  protected Map<String, Object> params;
-  private AtomicBoolean enabled = new AtomicBoolean(true);
 
   public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration
conf,
                       FileSystem fs, Path oldFileDir, String confKey) {
@@ -80,8 +101,42 @@ public abstract class CleanerChore<T extends FileCleanerDelegate>
extends Schedu
     this.conf = conf;
     this.params = params;
     initCleanerChain(confKey);
+
+    if (chorePool == null) {
+      String poolSize = conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE);
+      chorePoolSize = calculatePoolSize(poolSize);
+      // poolSize may be 0 or 0.0 from a careless configuration,
+      // double check to make sure.
+      chorePoolSize = chorePoolSize == 0 ?
+          calculatePoolSize(DEFAULT_CHORE_POOL_SIZE) : chorePoolSize;
+      this.chorePool = new ForkJoinPool(chorePoolSize);
+      LOG.info("Cleaner pool size is " + chorePoolSize);
+    }
   }
 
+  /**
+   * Calculate size for cleaner pool.
+   * @param poolSize size from configuration
+   * @return size of pool after calculation
+   */
+  private int calculatePoolSize(String poolSize) {
+    if (poolSize.matches("[1-9][0-9]*")) {
+      // If poolSize is an integer, return it directly,
+      // but upmost to the number of available processors.
+      int size = Math.min(Integer.valueOf(poolSize), AVAIL_PROCESSORS);
+      if (size == AVAIL_PROCESSORS) {
+        LOG.warn("Use full core processors to scan dir");
+      }
+      return size;
+    } else if (poolSize.matches("0.[0-9]+|1.0")) {
+      // if poolSize is a double, return poolSize * availableProcessors;
+      return (int) (AVAIL_PROCESSORS * Double.valueOf(poolSize));
+    } else {
+      LOG.error("Unrecognized value: " + poolSize + " for " + CHORE_POOL_SIZE +
+          ", use default config: " + DEFAULT_CHORE_POOL_SIZE + " instead.");
+      return calculatePoolSize(DEFAULT_CHORE_POOL_SIZE);
+    }
+  }
 
   /**
    * Validate the file to see if it even belongs in the directory. If it is valid, then the
file
@@ -109,6 +164,33 @@ public abstract class CleanerChore<T extends FileCleanerDelegate>
extends Schedu
     }
   }
 
+  @Override
+  public void onConfigurationChange(Configuration conf) {
+    int updatedSize = calculatePoolSize(conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE));
+    if (updatedSize == chorePoolSize) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Size from configuration is the same as previous which is " +
+          updatedSize + ", no need to update.");
+      }
+      return;
+    }
+    chorePoolSize = updatedSize;
+    if (chorePool.getPoolSize() == 0) {
+      // Chore does not work now, update it directly.
+      updateChorePoolSize(updatedSize);
+      return;
+    }
+    // Chore is working, update it after chore finished.
+    reconfig.set(true);
+  }
+
+  private void updateChorePoolSize(int updatedSize) {
+    chorePool.shutdownNow();
+    LOG.info("Update chore's pool size from " +
+        chorePool.getParallelism() + " to " + updatedSize);
+    chorePool = new ForkJoinPool(updatedSize);
+  }
+
   /**
    * A utility method to create new instances of LogCleanerDelegate based on the class name
of the
    * LogCleanerDelegate.
@@ -135,7 +217,17 @@ public abstract class CleanerChore<T extends FileCleanerDelegate>
extends Schedu
   @Override
   protected void chore() {
     if (getEnabled()) {
-      runCleaner();
+      if (runCleaner()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Cleaned old files/dirs under " + oldFileDir + " successfully.");
+        }
+      } else {
+        LOG.warn("Failed to fully clean old files/dirs under " + oldFileDir + ".");
+      }
+      // After each clean chore, checks if receives reconfigure notification while cleaning
+      if (reconfig.compareAndSet(true, false)) {
+        updateChorePoolSize(chorePoolSize);
+      }
     } else {
       LOG.debug("Cleaner chore disabled! Not cleaning.");
     }
@@ -147,16 +239,9 @@ public abstract class CleanerChore<T extends FileCleanerDelegate>
extends Schedu
 
   public Boolean runCleaner() {
     preRunCleaner();
-    try {
-      FileStatus[] files = FSUtils.listStatus(this.fs, this.oldFileDir);
-      checkAndDeleteEntries(files);
-    } catch (IOException e) {
-      e = e instanceof RemoteException ?
-              ((RemoteException)e).unwrapRemoteException() : e;
-      LOG.warn("Error while cleaning the logs", e);
-      return false;
-    }
-    return true;
+    CleanerTask task = new CleanerTask(this.oldFileDir, true);
+    chorePool.submit(task);
+    return task.join();
   }
 
   /**
@@ -199,95 +284,16 @@ public abstract class CleanerChore<T extends FileCleanerDelegate>
extends Schedu
   }
 
   /**
-   * Loop over the given directory entries, and check whether they can be deleted.
-   * If an entry is itself a directory it will be recursively checked and deleted itself
iff
-   * all subentries are deleted (and no new subentries are added in the mean time)
-   *
-   * @param entries directory entries to check
-   * @return true if all entries were successfully deleted
-   */
-  private boolean checkAndDeleteEntries(FileStatus[] entries) {
-    if (entries == null) {
-      return true;
-    }
-    boolean allEntriesDeleted = true;
-    List<FileStatus> files = Lists.newArrayListWithCapacity(entries.length);
-    List<FileStatus> dirs = new ArrayList<>();
-    for (FileStatus child : entries) {
-      if (child.isDirectory()) {
-        dirs.add(child);
-      } else {
-        // collect all files to attempt to delete in one batch
-        files.add(child);
-      }
-    }
-    if (dirs.size() > 0) {
-      sortByConsumedSpace(dirs);
-      LOG.debug("Prepared to delete files in directories: " + dirs);
-      for (FileStatus child : dirs) {
-        Path path = child.getPath();
-        // for each subdirectory delete it and all entries if possible
-        if (!checkAndDeleteDirectory(path)) {
-          allEntriesDeleted = false;
-        }
-      }
-    }
-    if (!checkAndDeleteFiles(files)) {
-      allEntriesDeleted = false;
-    }
-    return allEntriesDeleted;
-  }
-  
-  /**
-   * Attempt to delete a directory and all files under that directory. Each child file is
passed
-   * through the delegates to see if it can be deleted. If the directory has no children
when the
-   * cleaners have finished it is deleted.
-   * <p>
-   * If new children files are added between checks of the directory, the directory will
<b>not</b>
-   * be deleted.
-   * @param dir directory to check
-   * @return <tt>true</tt> if the directory was deleted, <tt>false</tt>
otherwise.
-   */
-  @VisibleForTesting boolean checkAndDeleteDirectory(Path dir) {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Checking directory: " + dir);
-    }
-
-    try {
-      FileStatus[] children = FSUtils.listStatus(fs, dir);
-      boolean allChildrenDeleted = checkAndDeleteEntries(children);
-  
-      // if the directory still has children, we can't delete it, so we are done
-      if (!allChildrenDeleted) return false;
-    } catch (IOException e) {
-      e = e instanceof RemoteException ?
-              ((RemoteException)e).unwrapRemoteException() : e;
-      LOG.warn("Error while listing directory: " + dir, e);
-      // couldn't list directory, so don't try to delete, and don't return success
-      return false;
-    }
-
-    // otherwise, all the children (that we know about) have been deleted, so we should try
to
-    // delete this directory. However, don't do so recursively so we don't delete files that
have
-    // been added since we last checked.
-    try {
-      return fs.delete(dir, false);
-    } catch (IOException e) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Couldn't delete directory: " + dir, e);
-      }
-      // couldn't delete w/o exception, so we can't return success.
-      return false;
-    }
-  }
-
-  /**
    * Run the given files through each of the cleaners to see if it should be deleted, deleting
it if
    * necessary.
    * @param files List of FileStatus for the files to check (and possibly delete)
    * @return true iff successfully deleted all files
    */
   private boolean checkAndDeleteFiles(List<FileStatus> files) {
+    if (files == null) {
+      return true;
+    }
+
     // first check to see if the path is valid
     List<FileStatus> validFiles = Lists.newArrayListWithCapacity(files.size());
     List<FileStatus> invalidFiles = Lists.newArrayList();
@@ -368,6 +374,11 @@ public abstract class CleanerChore<T extends FileCleanerDelegate>
extends Schedu
     }
   }
 
+  @VisibleForTesting
+  int getChorePoolSize() {
+    return chorePoolSize;
+  }
+
   /**
    * @param enabled
    */
@@ -378,4 +389,132 @@ public abstract class CleanerChore<T extends FileCleanerDelegate>
extends Schedu
   public boolean getEnabled() {
     return this.enabled.get();
   }
+
+  private interface Action<T> {
+    T act() throws IOException;
+  }
+
+  private class CleanerTask extends RecursiveTask<Boolean> {
+    private final Path dir;
+    private final boolean root;
+
+    CleanerTask(final FileStatus dir, final boolean root) {
+      this(dir.getPath(), root);
+    }
+
+    CleanerTask(final Path dir, final boolean root) {
+      this.dir = dir;
+      this.root = root;
+    }
+
+    @Override
+    protected Boolean compute() {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("CleanerTask " + Thread.currentThread().getId() +
+          " starts cleaning dirs and files under " + dir + " and itself.");
+      }
+
+      List<FileStatus> subDirs;
+      List<FileStatus> files;
+      try {
+        subDirs = getFilteredStatus(status -> status.isDirectory());
+        files = getFilteredStatus(status -> status.isFile());
+      } catch (IOException ioe) {
+        LOG.warn(dir + " doesn't exist, just skip it. ", ioe);
+        return true;
+      }
+
+      boolean nullSubDirs = subDirs == null;
+      if (nullSubDirs) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("There is no subdir under " + dir);
+        }
+      }
+      if (files == null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("There is no file under " + dir);
+        }
+      }
+
+      int capacity = nullSubDirs ? 0 : subDirs.size();
+      List<CleanerTask> tasks = Lists.newArrayListWithCapacity(capacity);
+      if (!nullSubDirs) {
+        sortByConsumedSpace(subDirs);
+        for (FileStatus subdir : subDirs) {
+          CleanerTask task = new CleanerTask(subdir, false);
+          tasks.add(task);
+          task.fork();
+        }
+      }
+
+      boolean result = true;
+      result &= deleteAction(() -> checkAndDeleteFiles(files), "files");
+      result &= deleteAction(() -> getCleanRusult(tasks), "subdirs");
+      // if and only if files and subdirs under current dir are deleted successfully, and
+      // it is not the root dir, then task will try to delete it.
+      if (result && !root) {
+        result &= deleteAction(() -> fs.delete(dir, false), "dir");
+      }
+      return result;
+    }
+
+    /**
+     * Get FileStatus with filter.
+     * Pay attention that FSUtils #listStatusWithStatusFilter would return null,
+     * even though status is empty but not null.
+     * @param function a filter function
+     * @return filtered FileStatus
+     * @throws IOException if there's no such a directory
+     */
+    private List<FileStatus> getFilteredStatus(Predicate<FileStatus> function)
throws IOException {
+      return FSUtils.listStatusWithStatusFilter(fs, dir, status -> function.test(status));
+    }
+
+    /**
+     * Perform a delete on a specified type.
+     * @param deletion a delete
+     * @param type possible values are 'files', 'subdirs', 'dirs'
+     * @return true if it deleted successfully, false otherwise
+     */
+    private boolean deleteAction(Action<Boolean> deletion, String type) {
+      boolean deleted;
+      String errorMsg = "";
+      try {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Start deleting " + type + " under " + dir);
+        }
+        deleted = deletion.act();
+      } catch (IOException ioe) {
+        errorMsg = ioe.getMessage();
+        deleted = false;
+      }
+      if (LOG.isDebugEnabled()) {
+        if (deleted) {
+          LOG.debug("Finish deleting " + type + " under " + dir);
+        } else {
+          LOG.debug("Couldn't delete " + type + " completely under " + dir +
+            " with reasons: " + (!errorMsg.equals("") ? errorMsg : " undeletable, please
check."));
+        }
+      }
+      return deleted;
+    }
+
+    /**
+     * Get cleaner results of subdirs.
+     * @param tasks subdirs cleaner tasks
+     * @return true if all subdirs deleted successfully, false for patial/all failures
+     * @throws IOException something happen during computation
+     */
+    private boolean getCleanRusult(List<CleanerTask> tasks) throws IOException {
+      boolean cleaned = true;
+      try {
+        for (CleanerTask task : tasks) {
+          cleaned &= task.get();
+        }
+      } catch (InterruptedException | ExecutionException e) {
+        throw new IOException(e);
+      }
+      return cleaned;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2838cf3e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
index e91d4f1..5c78dc4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
@@ -32,11 +32,10 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Stoppable;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.hbase.util.StealJobQueue;
+import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
 /**
@@ -44,8 +43,7 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe
  * folder that are deletable for each HFile cleaner in the chain.
  */
 @InterfaceAudience.Private
-public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> implements
-    ConfigurationObserver {
+public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> {
 
   public static final String MASTER_HFILE_CLEANER_PLUGINS = "hbase.master.hfilecleaner.plugins";
 
@@ -390,6 +388,8 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate>
impleme
 
   @Override
   public void onConfigurationChange(Configuration conf) {
+    super.onConfigurationChange(conf);
+
     if (!checkAndUpdateConfigurations(conf)) {
       LOG.debug("Update configuration triggered but nothing changed for this cleaner");
       return;

http://git-wip-us.apache.org/repos/asf/hbase/blob/2838cf3e/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
index 8569cb5..3cb620e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
@@ -19,15 +19,24 @@ package org.apache.hadoop.hbase.master.cleaner;
 
 import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
 
 /**
  * This Chore, every time it runs, will attempt to delete the WALs and Procedure WALs in
the old
@@ -38,6 +47,12 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
   private static final Log LOG = LogFactory.getLog(LogCleaner.class.getName());
 
+  public static final String OLD_WALS_CLEANER_SIZE = "hbase.oldwals.cleaner.thread.size";
+  public static final int OLD_WALS_CLEANER_DEFAULT_SIZE = 2;
+
+  private final LinkedBlockingQueue<CleanerContext> pendingDelete;
+  private List<Thread> oldWALsCleaner;
+
   /**
    * @param period the period of time to sleep between each run
    * @param stopper the stopper
@@ -48,6 +63,9 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate>
{
   public LogCleaner(final int period, final Stoppable stopper, Configuration conf, FileSystem
fs,
       Path oldLogDir) {
     super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS);
+    this.pendingDelete = new LinkedBlockingQueue<>();
+    int size = conf.getInt(OLD_WALS_CLEANER_SIZE, OLD_WALS_CLEANER_DEFAULT_SIZE);
+    this.oldWALsCleaner = createOldWalsCleaner(size);
   }
 
   @Override
@@ -55,4 +73,152 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate>
{
     return AbstractFSWALProvider.validateWALFilename(file.getName())
         || MasterProcedureUtil.validateProcedureWALFilename(file.getName());
   }
+
+  @Override
+  public void onConfigurationChange(Configuration conf) {
+    super.onConfigurationChange(conf);
+
+    int newSize = conf.getInt(OLD_WALS_CLEANER_SIZE, OLD_WALS_CLEANER_DEFAULT_SIZE);
+    if (newSize == oldWALsCleaner.size()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Size from configuration is the same as previous which is " +
+          newSize + ", no need to update.");
+      }
+      return;
+    }
+    interruptOldWALsCleaner();
+    oldWALsCleaner = createOldWalsCleaner(newSize);
+  }
+
+  @Override
+  protected int deleteFiles(Iterable<FileStatus> filesToDelete) {
+    List<CleanerContext> results = new LinkedList<>();
+    for (FileStatus toDelete : filesToDelete) {
+      CleanerContext context = CleanerContext.createCleanerContext(toDelete);
+      if (context != null) {
+        pendingDelete.add(context);
+        results.add(context);
+      }
+    }
+
+    int deletedFiles = 0;
+    for (CleanerContext res : results) {
+      deletedFiles += res.getResult(500) ? 1 : 0;
+    }
+    return deletedFiles;
+  }
+
+  @Override
+  public void cleanup() {
+    super.cleanup();
+    interruptOldWALsCleaner();
+  }
+
+  @VisibleForTesting
+  int getSizeOfCleaners() {
+    return oldWALsCleaner.size();
+  }
+
+  private List<Thread> createOldWalsCleaner(int size) {
+    LOG.info("Creating OldWALs cleaners with size: " + size);
+
+    List<Thread> oldWALsCleaner = new ArrayList<>(size);
+    for (int i = 0; i < size; i++) {
+      Thread cleaner = new Thread(() -> deleteFile());
+      cleaner.setName("OldWALsCleaner-" + i);
+      cleaner.setDaemon(true);
+      cleaner.start();
+      oldWALsCleaner.add(cleaner);
+    }
+    return oldWALsCleaner;
+  }
+
+  private void interruptOldWALsCleaner() {
+    for (Thread cleaner : oldWALsCleaner) {
+      cleaner.interrupt();
+    }
+    oldWALsCleaner.clear();
+  }
+
+  private void deleteFile() {
+    while (true) {
+      CleanerContext context = null;
+      boolean succeed = false;
+      boolean interrupted = false;
+      try {
+        context = pendingDelete.take();
+        if (context != null) {
+          FileStatus toClean = context.getTargetToClean();
+          succeed = this.fs.delete(toClean.getPath(), false);
+        }
+      } catch (InterruptedException ite) {
+        // It's most likely from configuration changing request
+        if (context != null) {
+          LOG.warn("Interrupted while cleaning oldWALs " +
+              context.getTargetToClean() + ", try to clean it next round.");
+        }
+        interrupted = true;
+      } catch (IOException e) {
+        // fs.delete() fails.
+        LOG.warn("Failed to clean oldwals with exception: " + e);
+        succeed = false;
+      } finally {
+        context.setResult(succeed);
+        if (interrupted) {
+          // Restore interrupt status
+          Thread.currentThread().interrupt();
+          break;
+        }
+      }
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Exiting cleaner.");
+    }
+  }
+
+  private static final class CleanerContext {
+    // At most waits 60 seconds
+    static final long MAX_WAIT = 60 * 1000;
+
+    final FileStatus target;
+    volatile boolean result;
+    volatile boolean setFromCleaner = false;
+
+    static CleanerContext createCleanerContext(FileStatus status) {
+      return status != null ? new CleanerContext(status) : null;
+    }
+
+    private CleanerContext(FileStatus status) {
+      this.target = status;
+      this.result = false;
+    }
+
+    synchronized void setResult(boolean res) {
+      this.result = res;
+      this.setFromCleaner = true;
+      notify();
+    }
+
+    synchronized boolean getResult(long waitIfNotFinished) {
+      long totalTime = 0;
+      try {
+        while (!setFromCleaner) {
+          wait(waitIfNotFinished);
+          totalTime += waitIfNotFinished;
+          if (totalTime >= MAX_WAIT) {
+            LOG.warn("Spend too much time to delete oldwals " + target);
+            return result;
+          }
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted while waiting deletion of " + target);
+        return result;
+      }
+      return result;
+    }
+
+    FileStatus getTargetToClean() {
+      return target;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/2838cf3e/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java
index 566479a..39bdbc7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java
@@ -17,14 +17,17 @@
  */
 package org.apache.hadoop.hbase.master.cleaner;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -51,8 +54,7 @@ public class TestCleanerChore {
   public void cleanup() throws Exception {
     // delete and recreate the test directory, ensuring a clean test dir between tests
     UTIL.cleanupTestDir();
-}
-
+  }
 
   @Test
   public void testSavesFilesOnRequest() throws Exception {
@@ -276,11 +278,8 @@ public class TestCleanerChore {
       }
     }).when(spy).isFileDeletable(Mockito.any());
 
-    // attempt to delete the directory, which
-    if (chore.checkAndDeleteDirectory(parent)) {
-      throw new Exception(
-          "Reported success deleting directory, should have failed when adding file mid-iteration");
-    }
+    // run the chore
+    chore.chore();
 
     // make sure all the directories + added file exist, but the original file is deleted
     assertTrue("Added file unexpectedly deleted", fs.exists(racyFile));
@@ -355,6 +354,54 @@ public class TestCleanerChore {
     assertTrue("Directory got deleted", fs.exists(parent));
   }
 
+  @Test
+  public void testOnConfigurationChange() throws Exception {
+    Stoppable stop = new StoppableImplementation();
+    Configuration conf = UTIL.getConfiguration();
+    Path testDir = UTIL.getDataTestDir();
+    FileSystem fs = UTIL.getTestFileSystem();
+    String confKey = "hbase.test.cleaner.delegates";
+    conf.set(confKey, AlwaysDelete.class.getName());
+    conf.set(CleanerChore.CHORE_POOL_SIZE, "2");
+    AllValidPaths chore = new AllValidPaths("test-file-cleaner", stop, conf, fs, testDir,
confKey);
+    chore.setEnabled(true);
+    // Create subdirs under testDir
+    int dirNums = 6;
+    Path[] subdirs = new Path[dirNums];
+    for (int i = 0; i < dirNums; i++) {
+      subdirs[i] = new Path(testDir, "subdir-" + i);
+      fs.mkdirs(subdirs[i]);
+    }
+    // Under each subdirs create 6 files
+    for (Path subdir : subdirs) {
+      createFiles(fs, subdir, 6);
+    }
+    // Start chore
+    Thread t = new Thread(() -> chore.chore());
+    t.setDaemon(true);
+    t.start();
+    // Change size of chore's pool
+    conf.set(CleanerChore.CHORE_POOL_SIZE, "4");
+    chore.onConfigurationChange(conf);
+    assertEquals(4, chore.getChorePoolSize());
+    // Stop chore
+    t.join();
+  }
+
+  private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException
{
+    Random random = new Random();
+    for (int i = 0; i < numOfFiles; i++) {
+      int xMega = 1 + random.nextInt(3); // size of each file is between 1~3M
+      try (FSDataOutputStream fsdos = fs.create(new Path(parentDir, "file-" + i))) {
+        for (int m = 0; m < xMega; m++) {
+          byte[] M = new byte[1024 * 1024];
+          random.nextBytes(M);
+          fsdos.write(M);
+        }
+      }
+    }
+  }
+
   private static class AllValidPaths extends CleanerChore<BaseHFileCleanerDelegate>
{
 
     public AllValidPaths(String name, Stoppable s, Configuration conf, FileSystem fs,

http://git-wip-us.apache.org/repos/asf/hbase/blob/2838cf3e/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
index 98176fe..34e81db 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
@@ -29,9 +29,11 @@ import java.net.URLEncoder;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -76,11 +78,13 @@ public class TestLogsCleaner {
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     TEST_UTIL.startMiniZKCluster();
+    TEST_UTIL.startMiniDFSCluster(1);
   }
 
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
     TEST_UTIL.shutdownMiniZKCluster();
+    TEST_UTIL.shutdownMiniDFSCluster();
   }
 
   /**
@@ -248,6 +252,53 @@ public class TestLogsCleaner {
     }
   }
 
+  @Test
+  public void testOnConfigurationChange() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setInt(LogCleaner.OLD_WALS_CLEANER_SIZE, LogCleaner.OLD_WALS_CLEANER_DEFAULT_SIZE);
+    // Prepare environments
+    Server server = new DummyServer();
+    Path oldWALsDir = new Path(TEST_UTIL.getDefaultRootDirPath(),
+        HConstants.HREGION_OLDLOGDIR_NAME);
+    FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
+    LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, oldWALsDir);
+    assertEquals(LogCleaner.OLD_WALS_CLEANER_DEFAULT_SIZE, cleaner.getSizeOfCleaners());
+    // Create dir and files for test
+    fs.delete(oldWALsDir, true);
+    fs.mkdirs(oldWALsDir);
+    int numOfFiles = 10;
+    createFiles(fs, oldWALsDir, numOfFiles);
+    FileStatus[] status = fs.listStatus(oldWALsDir);
+    assertEquals(numOfFiles, status.length);
+    // Start cleaner chore
+    Thread thread = new Thread(() -> cleaner.chore());
+    thread.setDaemon(true);
+    thread.start();
+    // change size of cleaners dynamically
+    int sizeToChange = 4;
+    conf.setInt(LogCleaner.OLD_WALS_CLEANER_SIZE, sizeToChange);
+    cleaner.onConfigurationChange(conf);
+    assertEquals(sizeToChange, cleaner.getSizeOfCleaners());
+    // Stop chore
+    thread.join();
+    status = fs.listStatus(oldWALsDir);
+    assertEquals(0, status.length);
+  }
+
+  private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException
{
+    Random random = new Random();
+    for (int i = 0; i < numOfFiles; i++) {
+      int xMega = 1 + random.nextInt(3); // size of each file is between 1~3M
+      try (FSDataOutputStream fsdos = fs.create(new Path(parentDir, "file-" + i))) {
+        for (int m = 0; m < xMega; m++) {
+          byte[] M = new byte[1024 * 1024];
+          random.nextBytes(M);
+          fsdos.write(M);
+        }
+      }
+    }
+  }
+
   static class DummyServer implements Server {
 
     @Override


Mime
View raw message