hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject hbase git commit: HBASE-17215 Separate small/large file delete threads in HFileCleaner to accelerate archived hfile cleanup speed
Date Sat, 01 Apr 2017 03:00:10 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 5f98ad205 -> 9facfa550


HBASE-17215 Separate small/large file delete threads in HFileCleaner to accelerate archived
hfile cleanup speed


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9facfa55
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9facfa55
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9facfa55

Branch: refs/heads/master
Commit: 9facfa550f1e7386be3a04d84f7e8013f5002965
Parents: 5f98ad2
Author: Yu Li <liyu@apache.org>
Authored: Sat Apr 1 10:59:11 2017 +0800
Committer: Yu Li <liyu@apache.org>
Committed: Sat Apr 1 10:59:11 2017 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/master/HMaster.java |   1 +
 .../hbase/master/cleaner/CleanerChore.java      |  14 +-
 .../hbase/master/cleaner/HFileCleaner.java      | 313 ++++++++++++++++++-
 .../hbase/regionserver/RSRpcServices.java       |   3 +-
 .../hbase/master/cleaner/TestHFileCleaner.java  | 152 +++++++++
 5 files changed, 478 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/9facfa55/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index a1cbe53..bb9f282 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -885,6 +885,7 @@ public class HMaster extends HRegionServer implements MasterServices {
     status.markComplete("Initialization successful");
     LOG.info("Master has completed initialization");
     configurationManager.registerObserver(this.balancer);
+    configurationManager.registerObserver(this.hfileCleaner);
 
     // Set master as 'initialized'.
     setInitialized(true);

http://git-wip-us.apache.org/repos/asf/hbase/blob/9facfa55/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
index dddad36..825feba 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
@@ -46,7 +46,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate>
extends Schedu
 
   private static final Log LOG = LogFactory.getLog(CleanerChore.class.getName());
 
-  private final FileSystem fs;
+  protected final FileSystem fs;
   private final Path oldFileDir;
   private final Configuration conf;
   protected List<T> cleanersChain;
@@ -269,6 +269,15 @@ public abstract class CleanerChore<T extends FileCleanerDelegate>
extends Schedu
     }
     
     Iterable<FileStatus> filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles);
+    return deleteFiles(filesToDelete) == files.size();
+  }
+
+  /**
+   * Delete the given files
+   * @param filesToDelete files to delete
+   * @return number of deleted files
+   */
+  protected int deleteFiles(Iterable<FileStatus> filesToDelete) {
     int deletedFileCount = 0;
     for (FileStatus file : filesToDelete) {
       Path filePath = file.getPath();
@@ -289,8 +298,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate>
extends Schedu
         LOG.warn("Error while deleting: " + filePath, e);
       }
     }
-
-    return deletedFileCount == files.size();
+    return deletedFileCount;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/9facfa55/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
index 89c316b..3a68252 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
@@ -17,22 +17,33 @@
  */
 package org.apache.hadoop.hbase.master.cleaner;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+
+import com.google.common.annotations.VisibleForTesting;
 /**
  * This Chore, every time it runs, will clear the HFiles in the hfile archive
  * folder that are deletable for each HFile cleaner in the chain.
  */
 @InterfaceAudience.Private
-public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> {
+public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> implements
+    ConfigurationObserver {
 
   public static final String MASTER_HFILE_CLEANER_PLUGINS = "hbase.master.hfilecleaner.plugins";
 
@@ -41,6 +52,34 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate>
{
     this(period, stopper, conf, fs, directory, null);
   }
 
+  // Configuration key for large/small throttle point
+  public final static String HFILE_DELETE_THROTTLE_THRESHOLD =
+      "hbase.regionserver.thread.hfilecleaner.throttle";
+  public final static int DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD = 64 * 1024 * 1024;// 64M
+
+  // Configuration key for large queue size
+  public final static String LARGE_HFILE_DELETE_QUEUE_SIZE =
+      "hbase.regionserver.hfilecleaner.large.queue.size";
+  public final static int DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE = 1048576;
+
+  // Configuration key for small queue size
+  public final static String SMALL_HFILE_DELETE_QUEUE_SIZE =
+      "hbase.regionserver.hfilecleaner.small.queue.size";
+  public final static int DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE = 1048576;
+
+  private static final Log LOG = LogFactory.getLog(HFileCleaner.class);
+
+  BlockingQueue<HFileDeleteTask> largeFileQueue;
+  BlockingQueue<HFileDeleteTask> smallFileQueue;
+  private int throttlePoint;
+  private int largeQueueSize;
+  private int smallQueueSize;
+  private List<Thread> threads = new ArrayList<Thread>();
+  private boolean running;
+
+  private long deletedLargeFiles = 0L;
+  private long deletedSmallFiles = 0L;
+
   /**
    * @param period the period of time to sleep between each run
    * @param stopper the stopper
@@ -53,6 +92,15 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate>
{
                       Path directory, Map<String, Object> params) {
     super("HFileCleaner", period, stopper, conf, fs,
       directory, MASTER_HFILE_CLEANER_PLUGINS, params);
+    throttlePoint =
+        conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
+    largeQueueSize =
+        conf.getInt(LARGE_HFILE_DELETE_QUEUE_SIZE, DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE);
+    smallQueueSize =
+        conf.getInt(SMALL_HFILE_DELETE_QUEUE_SIZE, DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE);
+    largeFileQueue = new LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(largeQueueSize);
+    smallFileQueue = new LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(smallQueueSize);
+    startHFileDeleteThreads();
   }
 
   @Override
@@ -69,4 +117,267 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate>
{
   public List<BaseHFileCleanerDelegate> getDelegatesForTesting() {
     return this.cleanersChain;
   }
+
+  @Override
+  public int deleteFiles(Iterable<FileStatus> filesToDelete) {
+    int deletedFiles = 0;
+    List<HFileDeleteTask> tasks = new ArrayList<HFileDeleteTask>();
+    // construct delete tasks and add into relative queue
+    for (FileStatus file : filesToDelete) {
+      HFileDeleteTask task = deleteFile(file);
+      if (task != null) {
+        tasks.add(task);
+      }
+    }
+    // wait for each submitted task to finish
+    for (HFileDeleteTask task : tasks) {
+      if (task.getResult()) {
+        deletedFiles++;
+      }
+    }
+    return deletedFiles;
+  }
+
+  /**
+   * Construct an {@link HFileDeleteTask} for each file to delete and add into the correct
queue
+   * @param file the file to delete
+   * @return HFileDeleteTask to track progress
+   */
+  private HFileDeleteTask deleteFile(FileStatus file) {
+    HFileDeleteTask task = new HFileDeleteTask(file);
+    boolean enqueued = dispatch(task);
+    return enqueued ? task : null;
+  }
+
+  private boolean dispatch(HFileDeleteTask task) {
+    if (task.fileLength >= this.throttlePoint) {
+      if (!this.largeFileQueue.offer(task)) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Large file deletion queue is full");
+        }
+        return false;
+      }
+    } else {
+      if (!this.smallFileQueue.offer(task)) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Small file deletion queue is full");
+        }
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public void cleanup() {
+    super.cleanup();
+    stopHFileDeleteThreads();
+  }
+
+  /**
+   * Start threads for hfile deletion
+   */
+  private void startHFileDeleteThreads() {
+    final String n = Thread.currentThread().getName();
+    running = true;
+    // start thread for large file deletion
+    Thread large = new Thread() {
+      @Override
+      public void run() {
+        consumerLoop(largeFileQueue);
+      }
+    };
+    large.setDaemon(true);
+    large.setName(n + "-HFileCleaner.large-" + System.currentTimeMillis());
+    large.start();
+    LOG.debug("Starting hfile cleaner for large files: " + large.getName());
+    threads.add(large);
+
+    // start thread for small file deletion
+    Thread small = new Thread() {
+      @Override
+      public void run() {
+        consumerLoop(smallFileQueue);
+      }
+    };
+    small.setDaemon(true);
+    small.setName(n + "-HFileCleaner.small-" + System.currentTimeMillis());
+    small.start();
+    LOG.debug("Starting hfile cleaner for small files: " + small.getName());
+    threads.add(small);
+  }
+
+  protected void consumerLoop(BlockingQueue<HFileDeleteTask> queue) {
+    try {
+      while (running) {
+        HFileDeleteTask task = null;
+        try {
+          task = queue.take();
+        } catch (InterruptedException e) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Interrupted while trying to take a task from queue", e);
+          }
+          break;
+        }
+        if (task != null) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Removing: " + task.filePath + " from archive");
+          }
+          boolean succeed;
+          try {
+            succeed = this.fs.delete(task.filePath, false);
+          } catch (IOException e) {
+            LOG.warn("Failed to delete file " + task.filePath, e);
+            succeed = false;
+          }
+          task.setResult(succeed);
+          if (succeed) {
+            countDeletedFiles(queue == largeFileQueue);
+          }
+        }
+      }
+    } finally {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Exit thread: " + Thread.currentThread());
+      }
+    }
+  }
+
+  // Currently only for testing purpose
+  private void countDeletedFiles(boolean isLarge) {
+    if (isLarge) {
+      if (deletedLargeFiles == Long.MAX_VALUE) {
+        LOG.info("Deleted more than Long.MAX_VALUE large files, reset counter to 0");
+        deletedLargeFiles = 0L;
+      }
+      deletedLargeFiles++;
+    } else {
+      if (deletedSmallFiles == Long.MAX_VALUE) {
+        LOG.info("Deleted more than Long.MAX_VALUE small files, reset counter to 0");
+        deletedSmallFiles = 0L;
+      }
+      deletedSmallFiles++;
+    }
+  }
+
+  /**
+   * Stop threads for hfile deletion
+   */
+  private void stopHFileDeleteThreads() {
+    running = false;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Stopping file delete threads");
+    }
+    for(Thread thread: threads){
+      thread.interrupt();
+    }
+  }
+
+  static class HFileDeleteTask {
+    private static final long MAX_WAIT = 60 * 1000L;
+    private static final long WAIT_UNIT = 1000L;
+
+    boolean done = false;
+    boolean result;
+    final Path filePath;
+    final long fileLength;
+
+    public HFileDeleteTask(FileStatus file) {
+      this.filePath = file.getPath();
+      this.fileLength = file.getLen();
+    }
+
+    public synchronized void setResult(boolean result) {
+      this.done = true;
+      this.result = result;
+      notify();
+    }
+
+    public synchronized boolean getResult() {
+      long waitTime = 0;
+      try {
+        while (!done) {
+          wait(WAIT_UNIT);
+          waitTime += WAIT_UNIT;
+          if (done) {
+            return this.result;
+          }
+          if (waitTime > MAX_WAIT) {
+            LOG.warn("Wait more than " + MAX_WAIT + " ms for deleting " + this.filePath
+                + ", exit...");
+            return false;
+          }
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted while waiting for result of deleting " + filePath
+            + ", will return false", e);
+        return false;
+      }
+      return this.result;
+    }
+  }
+
+  @VisibleForTesting
+  public List<Thread> getCleanerThreads() {
+    return threads;
+  }
+
+  @VisibleForTesting
+  public long getNumOfDeletedLargeFiles() {
+    return deletedLargeFiles;
+  }
+
+  @VisibleForTesting
+  public long getNumOfDeletedSmallFiles() {
+    return deletedSmallFiles;
+  }
+
+  @VisibleForTesting
+  public long getLargeQueueSize() {
+    return largeQueueSize;
+  }
+
+  @VisibleForTesting
+  public long getSmallQueueSize() {
+    return smallQueueSize;
+  }
+
+  @VisibleForTesting
+  public long getThrottlePoint() {
+    return throttlePoint;
+  }
+
+  @Override
+  public void onConfigurationChange(Configuration conf) {
+    StringBuilder builder = new StringBuilder();
+    builder.append("Updating configuration for HFileCleaner, previous throttle point: ")
+        .append(throttlePoint).append(", largeQueueSize: ").append(largeQueueSize)
+        .append(", smallQueueSize: ").append(smallQueueSize);
+    stopHFileDeleteThreads();
+    this.throttlePoint =
+        conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD);
+    this.largeQueueSize =
+        conf.getInt(LARGE_HFILE_DELETE_QUEUE_SIZE, DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE);
+    this.smallQueueSize =
+        conf.getInt(SMALL_HFILE_DELETE_QUEUE_SIZE, DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE);
+    // record the left over tasks
+    List<HFileDeleteTask> leftOverTasks = new ArrayList<>();
+    for (HFileDeleteTask task : largeFileQueue) {
+      leftOverTasks.add(task);
+    }
+    for (HFileDeleteTask task : smallFileQueue) {
+      leftOverTasks.add(task);
+    }
+    largeFileQueue = new LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(largeQueueSize);
+    smallFileQueue = new LinkedBlockingQueue<HFileCleaner.HFileDeleteTask>(smallQueueSize);
+    threads.clear();
+    builder.append("; new throttle point: ").append(throttlePoint).append(", largeQueueSize:
")
+        .append(largeQueueSize).append(", smallQueueSize: ").append(smallQueueSize);
+    LOG.debug(builder.toString());
+    startHFileDeleteThreads();
+    // re-dispatch the left over tasks
+    for (HFileDeleteTask task : leftOverTasks) {
+      dispatch(task);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/9facfa55/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 298f538..8d4ea4d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -1295,7 +1295,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       throw new RegionServerStoppedException("File system not available");
     }
     if (!regionServer.isOnline()) {
-      throw new ServerNotRunningYetException("Server is not running yet");
+      throw new ServerNotRunningYetException("Server " + regionServer.serverName
+          + " is not running yet");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/9facfa55/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
index 6049701..8e8a4dd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
@@ -22,10 +22,12 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -43,6 +45,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -260,4 +263,153 @@ public class TestHFileCleaner {
       return null;
     }
   }
+
+  @Test
+  public void testThreadCleanup() throws Exception {
+    Configuration conf = UTIL.getConfiguration();
+    conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, "");
+    Server server = new DummyServer();
+    Path archivedHfileDir =
+        new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY);
+
+    // setup the cleaner
+    FileSystem fs = UTIL.getDFSCluster().getFileSystem();
+    HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir);
+    // clean up archive directory
+    fs.delete(archivedHfileDir, true);
+    fs.mkdirs(archivedHfileDir);
+    // create some file to delete
+    fs.createNewFile(new Path(archivedHfileDir, "dfd-dfd"));
+    // launch the chore
+    cleaner.chore();
+    // call cleanup
+    cleaner.cleanup();
+    // wait awhile for thread to die
+    Thread.sleep(100);
+    for (Thread thread : cleaner.getCleanerThreads()) {
+      Assert.assertFalse(thread.isAlive());
+    }
+  }
+
+  @Test
+  public void testLargeSmallIsolation() throws Exception {
+    Configuration conf = UTIL.getConfiguration();
+    // no cleaner policies = delete all files
+    conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, "");
+    conf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, 512 * 1024);
+    Server server = new DummyServer();
+    Path archivedHfileDir =
+        new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY);
+
+    // setup the cleaner
+    FileSystem fs = UTIL.getDFSCluster().getFileSystem();
+    HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir);
+    // clean up archive directory
+    fs.delete(archivedHfileDir, true);
+    fs.mkdirs(archivedHfileDir);
+    // necessary set up
+    final int LARGE_FILE_NUM = 5;
+    final int SMALL_FILE_NUM = 20;
+    createFilesForTesting(LARGE_FILE_NUM, SMALL_FILE_NUM, fs, archivedHfileDir);
+    // call cleanup
+    cleaner.chore();
+
+    Assert.assertEquals(LARGE_FILE_NUM, cleaner.getNumOfDeletedLargeFiles());
+    Assert.assertEquals(SMALL_FILE_NUM, cleaner.getNumOfDeletedSmallFiles());
+  }
+
+  @Test(timeout = 60 * 1000)
+  public void testOnConfigurationChange() throws Exception {
+    // constants
+    final int ORIGINAL_THROTTLE_POINT = 512 * 1024;
+    final int ORIGINAL_QUEUE_SIZE = 512;
+    final int UPDATE_THROTTLE_POINT = 1024;// small enough to change large/small check
+    final int UPDATE_QUEUE_SIZE = 1024;
+    final int LARGE_FILE_NUM = 5;
+    final int SMALL_FILE_NUM = 20;
+
+    Configuration conf = UTIL.getConfiguration();
+    // no cleaner policies = delete all files
+    conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, "");
+    conf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, ORIGINAL_THROTTLE_POINT);
+    conf.setInt(HFileCleaner.LARGE_HFILE_DELETE_QUEUE_SIZE, ORIGINAL_QUEUE_SIZE);
+    conf.setInt(HFileCleaner.SMALL_HFILE_DELETE_QUEUE_SIZE, ORIGINAL_QUEUE_SIZE);
+    Server server = new DummyServer();
+    Path archivedHfileDir =
+        new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY);
+
+    // setup the cleaner
+    FileSystem fs = UTIL.getDFSCluster().getFileSystem();
+    final HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir);
+    Assert.assertEquals(ORIGINAL_THROTTLE_POINT, cleaner.getThrottlePoint());
+    Assert.assertEquals(ORIGINAL_QUEUE_SIZE, cleaner.getLargeQueueSize());
+    Assert.assertEquals(ORIGINAL_QUEUE_SIZE, cleaner.getSmallQueueSize());
+
+    // clean up archive directory and create files for testing
+    fs.delete(archivedHfileDir, true);
+    fs.mkdirs(archivedHfileDir);
+    createFilesForTesting(LARGE_FILE_NUM, SMALL_FILE_NUM, fs, archivedHfileDir);
+
+    // call cleaner, run as daemon to test the interrupt-at-middle case
+    Thread t = new Thread() {
+      @Override
+      public void run() {
+        cleaner.chore();
+      }
+    };
+    t.setDaemon(true);
+    t.start();
+    // let the cleaner run for some while
+    Thread.sleep(20);
+
+    // trigger configuration change
+    Configuration newConf = new Configuration(conf);
+    newConf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, UPDATE_THROTTLE_POINT);
+    newConf.setInt(HFileCleaner.LARGE_HFILE_DELETE_QUEUE_SIZE, UPDATE_QUEUE_SIZE);
+    newConf.setInt(HFileCleaner.SMALL_HFILE_DELETE_QUEUE_SIZE, UPDATE_QUEUE_SIZE);
+    cleaner.onConfigurationChange(newConf);
+    LOG.debug("File deleted from large queue: " + cleaner.getNumOfDeletedLargeFiles()
+        + "; from small queue: " + cleaner.getNumOfDeletedSmallFiles());
+
+    // check values after change
+    Assert.assertEquals(UPDATE_THROTTLE_POINT, cleaner.getThrottlePoint());
+    Assert.assertEquals(UPDATE_QUEUE_SIZE, cleaner.getLargeQueueSize());
+    Assert.assertEquals(UPDATE_QUEUE_SIZE, cleaner.getSmallQueueSize());
+    Assert.assertEquals(2, cleaner.getCleanerThreads().size());
+
+    // wait until clean done and check
+    t.join();
+    LOG.debug("File deleted from large queue: " + cleaner.getNumOfDeletedLargeFiles()
+        + "; from small queue: " + cleaner.getNumOfDeletedSmallFiles());
+    Assert.assertTrue("Should delete more than " + LARGE_FILE_NUM
+        + " files from large queue but actually " + cleaner.getNumOfDeletedLargeFiles(),
+      cleaner.getNumOfDeletedLargeFiles() > LARGE_FILE_NUM);
+    Assert.assertTrue("Should delete less than " + SMALL_FILE_NUM
+        + " files from small queue but actually " + cleaner.getNumOfDeletedSmallFiles(),
+      cleaner.getNumOfDeletedSmallFiles() < SMALL_FILE_NUM);
+  }
+
+  private void createFilesForTesting(int largeFileNum, int smallFileNum, FileSystem fs,
+      Path archivedHfileDir) throws IOException {
+    final Random rand = new Random();
+    final byte[] large = new byte[1024 * 1024];
+    for (int i = 0; i < large.length; i++) {
+      large[i] = (byte) rand.nextInt(128);
+    }
+    final byte[] small = new byte[1024];
+    for (int i = 0; i < small.length; i++) {
+      small[i] = (byte) rand.nextInt(128);
+    }
+    // create large and small files
+    for (int i = 1; i <= largeFileNum; i++) {
+      FSDataOutputStream out = fs.create(new Path(archivedHfileDir, "large-file-" + i));
+      out.write(large);
+      out.close();
+    }
+    for (int i = 1; i <= smallFileNum; i++) {
+      FSDataOutputStream out = fs.create(new Path(archivedHfileDir, "small-file-" + i));
+      out.write(small);
+      out.close();
+    }
+  }
 }


Mime
View raw message