hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject [hbase] branch master updated: HBASE-19616 Review of LogCleaner Class
Date Tue, 05 Feb 2019 06:16:39 GMT
This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new af92322  HBASE-19616 Review of LogCleaner Class
af92322 is described below

commit af923225d0a874ecf3c7deddbc0d7bc82184e1d1
Author: stack <stack@apache.org>
AuthorDate: Mon Feb 4 22:16:26 2019 -0800

    HBASE-19616 Review of LogCleaner Class
---
 .../hadoop/hbase/master/cleaner/LogCleaner.java    | 153 +++++++++------------
 .../hbase/master/cleaner/TestLogsCleaner.java      | 146 ++++++++++----------
 2 files changed, 136 insertions(+), 163 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
index db098e2..a7338c0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java
@@ -21,10 +21,11 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -36,7 +37,9 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 
 /**
  * This Chore, every time it runs, will attempt to delete the WALs and Procedure WALs in
the old
@@ -45,7 +48,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
  */
 @InterfaceAudience.Private
 public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
-  private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class.getName());
+  private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class);
 
   public static final String OLD_WALS_CLEANER_THREAD_SIZE = "hbase.oldwals.cleaner.thread.size";
   public static final int DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE = 2;
@@ -55,16 +58,9 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate>
{
   @VisibleForTesting
   static final long DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC = 60 * 1000L;
 
-  public static final String OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC =
-      "hbase.oldwals.cleaner.thread.check.interval.msec";
-  @VisibleForTesting
-  static final long DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC = 500L;
-
-
   private final LinkedBlockingQueue<CleanerContext> pendingDelete;
   private List<Thread> oldWALsCleaner;
   private long cleanerThreadTimeoutMsec;
-  private long cleanerThreadCheckIntervalMsec;
 
   /**
    * @param period the period of time to sleep between each run
@@ -81,8 +77,6 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate>
{
     this.oldWALsCleaner = createOldWalsCleaner(size);
     this.cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
         DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC);
-    this.cleanerThreadCheckIntervalMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC,
-        DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC);
   }
 
   @Override
@@ -97,35 +91,33 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate>
{
 
     int newSize = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
     if (newSize == oldWALsCleaner.size()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Size from configuration is the same as previous which is " +
-          newSize + ", no need to update.");
-      }
+      LOG.debug("Size from configuration is the same as previous which "
+          + "is {}, no need to update.", newSize);
       return;
     }
     interruptOldWALsCleaner();
     oldWALsCleaner = createOldWalsCleaner(newSize);
     cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
         DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC);
-    cleanerThreadCheckIntervalMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC,
-        DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC);
   }
 
   @Override
   protected int deleteFiles(Iterable<FileStatus> filesToDelete) {
-    List<CleanerContext> results = new LinkedList<>();
-    for (FileStatus toDelete : filesToDelete) {
-      CleanerContext context = CleanerContext.createCleanerContext(toDelete,
-          cleanerThreadTimeoutMsec);
-      if (context != null) {
-        pendingDelete.add(context);
-        results.add(context);
+    List<CleanerContext> results = new ArrayList<>();
+    for (FileStatus file : filesToDelete) {
+      LOG.trace("Scheduling file {} for deletion", file);
+      if (file != null) {
+        results.add(new CleanerContext(file));
       }
     }
 
+    LOG.debug("Old WAL files pending deletion: {}", results);
+    pendingDelete.addAll(results);
+
     int deletedFiles = 0;
     for (CleanerContext res : results) {
-      deletedFiles += res.getResult(cleanerThreadCheckIntervalMsec) ? 1 : 0;
+      LOG.trace("Awaiting the results for deletion of old WAL file: {}", res);
+      deletedFiles += res.getResult(this.cleanerThreadTimeoutMsec) ? 1 : 0;
     }
     return deletedFiles;
   }
@@ -146,13 +138,8 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate>
{
     return cleanerThreadTimeoutMsec;
   }
 
-  @VisibleForTesting
-  long getCleanerThreadCheckIntervalMsec() {
-    return cleanerThreadCheckIntervalMsec;
-  }
-
   private List<Thread> createOldWalsCleaner(int size) {
-    LOG.info("Creating OldWALs cleaners with size=" + size);
+    LOG.info("Creating {} OldWALs cleaner threads", size);
 
     List<Thread> oldWALsCleaner = new ArrayList<>(size);
     for (int i = 0; i < size; i++) {
@@ -167,6 +154,7 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate>
{
 
   private void interruptOldWALsCleaner() {
     for (Thread cleaner : oldWALsCleaner) {
+      LOG.trace("Interrupting thread: {}", cleaner);
       cleaner.interrupt();
     }
     oldWALsCleaner.clear();
@@ -174,95 +162,78 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate>
{
 
   private void deleteFile() {
     while (true) {
-      CleanerContext context = null;
-      boolean succeed = false;
-      boolean interrupted = false;
       try {
-        context = pendingDelete.take();
-        if (context != null) {
-          FileStatus toClean = context.getTargetToClean();
-          succeed = this.fs.delete(toClean.getPath(), false);
-        }
-      } catch (InterruptedException ite) {
-        // It's most likely from configuration changing request
-        if (context != null) {
-          LOG.warn("Interrupted while cleaning oldWALs " +
-              context.getTargetToClean() + ", try to clean it next round.");
-        }
-        interrupted = true;
-      } catch (IOException e) {
-        // fs.delete() fails.
-        LOG.warn("Failed to clean oldwals with exception: " + e);
-        succeed = false;
-      } finally {
-        if (context != null) {
+        final CleanerContext context = pendingDelete.take();
+        Preconditions.checkNotNull(context);
+        FileStatus oldWalFile = context.getTargetToClean();
+        try {
+          LOG.debug("Attempting to delete old WAL file: {}", oldWalFile);
+          boolean succeed = this.fs.delete(oldWalFile.getPath(), false);
           context.setResult(succeed);
+        } catch (IOException e) {
+          // fs.delete() fails.
+          LOG.warn("Failed to clean old WAL file", e);
+          context.setResult(false);
         }
-        if (interrupted) {
-          // Restore interrupt status
-          Thread.currentThread().interrupt();
-          break;
-        }
+      } catch (InterruptedException ite) {
+        // It is most likely from configuration changing request
+        LOG.warn("Interrupted while cleaning old WALs, will "
+            + "try to clean it next round. Exiting.");
+        // Restore interrupt status
+        Thread.currentThread().interrupt();
+        return;
       }
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Exiting cleaner.");
+      LOG.debug("Exiting");
     }
   }
 
   @Override
   public synchronized void cancel(boolean mayInterruptIfRunning) {
+    LOG.debug("Cancelling LogCleaner");
     super.cancel(mayInterruptIfRunning);
-    for (Thread t : oldWALsCleaner) {
-      t.interrupt();
-    }
+    interruptOldWALsCleaner();
   }
 
   private static final class CleanerContext {
 
     final FileStatus target;
-    volatile boolean result;
-    volatile boolean setFromCleaner = false;
-    long timeoutMsec;
-
-    static CleanerContext createCleanerContext(FileStatus status, long timeoutMsec) {
-      return status != null ? new CleanerContext(status, timeoutMsec) : null;
-    }
+    final AtomicBoolean result;
+    final CountDownLatch remainingResults;
 
-    private CleanerContext(FileStatus status, long timeoutMsec) {
+    private CleanerContext(FileStatus status) {
       this.target = status;
-      this.result = false;
-      this.timeoutMsec = timeoutMsec;
+      this.result = new AtomicBoolean(false);
+      this.remainingResults = new CountDownLatch(1);
     }
 
-    synchronized void setResult(boolean res) {
-      this.result = res;
-      this.setFromCleaner = true;
-      notify();
+    void setResult(boolean res) {
+      this.result.set(res);
+      this.remainingResults.countDown();
     }
 
-    synchronized boolean getResult(long waitIfNotFinished) {
-      long totalTimeMsec = 0;
+    boolean getResult(long waitIfNotFinished) {
       try {
-        while (!setFromCleaner) {
-          long startTimeNanos = System.nanoTime();
-          wait(waitIfNotFinished);
-          totalTimeMsec += TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNanos,
-              TimeUnit.NANOSECONDS);
-          if (totalTimeMsec >= timeoutMsec) {
-            LOG.warn("Spend too much time " + totalTimeMsec + " ms to delete oldwals " +
target);
-            return result;
-          }
+        boolean completed = this.remainingResults.await(waitIfNotFinished,
+            TimeUnit.MILLISECONDS);
+        if (!completed) {
+          LOG.warn("Spend too much time [{}ms] to delete old WAL file: {}",
+              waitIfNotFinished, target);
+          return false;
         }
       } catch (InterruptedException e) {
-        LOG.warn("Interrupted while waiting deletion of " + target);
-        return result;
+        LOG.warn("Interrupted while awaiting deletion of WAL file: {}", target);
+        return false;
       }
-      return result;
+      return result.get();
     }
 
     FileStatus getTargetToClean() {
       return target;
     }
+
+    @Override
+    public String toString() {
+      return "CleanerContext [target=" + target + ", result=" + result + "]";
+    }
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
index 247ed01..4d254a0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
@@ -26,10 +26,15 @@ import static org.mockito.Mockito.spy;
 
 import java.io.IOException;
 import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.RandomUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -58,6 +63,7 @@ import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.zookeeper.KeeperException;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -67,8 +73,6 @@ import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-
 @Category({MasterTests.class, MediumTests.class})
 public class TestLogsCleaner {
 
@@ -79,6 +83,14 @@ public class TestLogsCleaner {
   private static final Logger LOG = LoggerFactory.getLogger(TestLogsCleaner.class);
   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
+  private static final Path OLD_WALS_DIR =
+      new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME);
+
+  private static final Path OLD_PROCEDURE_WALS_DIR =
+      new Path(OLD_WALS_DIR, "masterProcedureWALs");
+
+  private static Configuration conf;
+
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     TEST_UTIL.startMiniZKCluster();
@@ -92,6 +104,18 @@ public class TestLogsCleaner {
     TEST_UTIL.shutdownMiniDFSCluster();
   }
 
+  @Before
+  public void beforeTest() throws IOException {
+    conf = TEST_UTIL.getConfiguration();
+
+    FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
+
+    fs.delete(OLD_WALS_DIR, true);
+
+    // root directory
+    fs.mkdirs(OLD_WALS_DIR);
+  }
+
   /**
    * This tests verifies LogCleaner works correctly with WALs and Procedure WALs located
    * in the same oldWALs directory.
@@ -111,7 +135,6 @@ public class TestLogsCleaner {
    */
   @Test
   public void testLogCleaning() throws Exception {
-    Configuration conf = TEST_UTIL.getConfiguration();
     // set TTLs
     long ttlWAL = 2000;
     long ttlProcedureWAL = 4000;
@@ -122,23 +145,23 @@ public class TestLogsCleaner {
     Server server = new DummyServer();
     ReplicationQueueStorage queueStorage =
         ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
-    final Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME);
-    final Path oldProcedureWALDir = new Path(oldLogDir, "masterProcedureWALs");
-    String fakeMachineName = URLEncoder.encode(server.getServerName().toString(), "UTF8");
+
+    String fakeMachineName = URLEncoder.encode(
+        server.getServerName().toString(), StandardCharsets.UTF_8.name());
 
     final FileSystem fs = FileSystem.get(conf);
+    fs.mkdirs(OLD_PROCEDURE_WALS_DIR);
 
-    long now = System.currentTimeMillis();
-    fs.delete(oldLogDir, true);
-    fs.mkdirs(oldLogDir);
+    final long now = System.currentTimeMillis();
 
     // Case 1: 2 invalid files, which would be deleted directly
-    fs.createNewFile(new Path(oldLogDir, "a"));
-    fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + "a"));
+    fs.createNewFile(new Path(OLD_WALS_DIR, "a"));
+    fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + "a"));
 
     // Case 2: 5 Procedure WALs that are old which would be deleted
-    for (int i = 1; i < 6; i++) {
-      Path fileName = new Path(oldProcedureWALDir, String.format("pv2-%020d.log", i));
+    for (int i = 1; i <= 5; i++) {
+      final Path fileName =
+          new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i));
       fs.createNewFile(fileName);
     }
 
@@ -146,66 +169,65 @@ public class TestLogsCleaner {
     Thread.sleep(ttlProcedureWAL - ttlWAL);
 
     // Case 3: old WALs which would be deletable
-    for (int i = 1; i < 31; i++) {
-      Path fileName = new Path(oldLogDir, fakeMachineName + "." + (now - i));
+    for (int i = 1; i <= 30; i++) {
+      Path fileName = new Path(OLD_WALS_DIR, fakeMachineName + "." + (now - i));
       fs.createNewFile(fileName);
       // Case 4: put 3 WALs in ZK indicating that they are scheduled for replication so these
       // files would pass TimeToLiveLogCleaner but would be rejected by ReplicationLogCleaner
-      if (i % (30 / 3) == 1) {
+      if (i % (30 / 3) == 0) {
         queueStorage.addWAL(server.getServerName(), fakeMachineName, fileName.getName());
         LOG.info("Replication log file: " + fileName);
       }
     }
 
     // Case 5: 5 Procedure WALs that are new, will stay
-    for (int i = 6; i < 11; i++) {
-      Path fileName = new Path(oldProcedureWALDir, String.format("pv2-%020d.log", i));
+    for (int i = 6; i <= 10; i++) {
+      Path fileName =
+          new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i));
       fs.createNewFile(fileName);
     }
 
     // Sleep for sometime to get newer modification time
     Thread.sleep(ttlWAL);
-    fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + now));
+    fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + now));
 
     // Case 6: 1 newer WAL, not even deletable for TimeToLiveLogCleaner,
     // so we are not going down the chain
-    fs.createNewFile(new Path(oldLogDir, fakeMachineName + "." + (now + ttlWAL)));
+    fs.createNewFile(new Path(OLD_WALS_DIR, fakeMachineName + "." + (now + ttlWAL)));
 
-    for (FileStatus stat : fs.listStatus(oldLogDir)) {
-      LOG.info(stat.getPath().toString());
-    }
+    FileStatus[] status = fs.listStatus(OLD_WALS_DIR);
+    LOG.info("File status: {}", Arrays.toString(status));
 
-    // There should be 34 files and masterProcedureWALs directory
-    assertEquals(35, fs.listStatus(oldLogDir).length);
+    // There should be 34 files and 1 masterProcedureWALs directory
+    assertEquals(35, fs.listStatus(OLD_WALS_DIR).length);
     // 10 procedure WALs
-    assertEquals(10, fs.listStatus(oldProcedureWALDir).length);
+    assertEquals(10, fs.listStatus(OLD_PROCEDURE_WALS_DIR).length);
 
-    LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, oldLogDir);
+    LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR);
     cleaner.chore();
 
     // In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which
     // are scheduled for replication and masterProcedureWALs directory
-    TEST_UTIL.waitFor(1000,
-        (Waiter.Predicate<Exception>) () -> 6 == fs.listStatus(oldLogDir).length);
+    TEST_UTIL.waitFor(1000, (Waiter.Predicate<Exception>) () -> 6 == fs
+        .listStatus(OLD_WALS_DIR).length);
     // In masterProcedureWALs we end up with 5 newer Procedure WALs
-    TEST_UTIL.waitFor(1000,
-        (Waiter.Predicate<Exception>) () -> 5 == fs.listStatus(oldProcedureWALDir).length);
-
-    for (FileStatus file : fs.listStatus(oldLogDir)) {
-      LOG.debug("Kept log file in oldWALs: " + file.getPath().getName());
-    }
-    for (FileStatus file : fs.listStatus(oldProcedureWALDir)) {
-      LOG.debug("Kept log file in masterProcedureWALs: " + file.getPath().getName());
+    TEST_UTIL.waitFor(1000, (Waiter.Predicate<Exception>) () -> 5 == fs
+        .listStatus(OLD_PROCEDURE_WALS_DIR).length);
+
+    if (LOG.isDebugEnabled()) {
+      FileStatus[] statusOldWALs = fs.listStatus(OLD_WALS_DIR);
+      FileStatus[] statusProcedureWALs = fs.listStatus(OLD_PROCEDURE_WALS_DIR);
+      LOG.debug("Kept log file for oldWALs: {}", Arrays.toString(statusOldWALs));
+      LOG.debug("Kept log file for masterProcedureWALs: {}",
+          Arrays.toString(statusProcedureWALs));
     }
   }
 
   @Test(timeout=10000)
   public void testZooKeeperAbortDuringGetListOfReplicators() throws Exception {
-    Configuration conf = TEST_UTIL.getConfiguration();
-
     ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
 
-    List<FileStatus> dummyFiles = Lists.newArrayList(
+    List<FileStatus> dummyFiles = Arrays.asList(
         new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")),
         new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2"))
     );
@@ -224,7 +246,7 @@ public class TestLogsCleaner {
           try {
             return invocation.callRealMethod();
           } catch (ReplicationException e) {
-            LOG.debug("caught " + e);
+            LOG.debug("Caught Exception", e);
             getListOfReplicatorsFailed.set(true);
             throw e;
           }
@@ -250,10 +272,9 @@ public class TestLogsCleaner {
    */
   @Test(timeout=10000)
   public void testZooKeeperNormal() throws Exception {
-    Configuration conf = TEST_UTIL.getConfiguration();
     ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
 
-    List<FileStatus> dummyFiles = Lists.newArrayList(
+    List<FileStatus> dummyFiles = Arrays.asList(
         new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")),
         new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2"))
     );
@@ -276,30 +297,18 @@ public class TestLogsCleaner {
 
   @Test
   public void testOnConfigurationChange() throws Exception {
-    Configuration conf = TEST_UTIL.getConfiguration();
-    conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE,
-        LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE);
-    conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
-        LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC);
-    conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC,
-        LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC);
     // Prepare environments
     Server server = new DummyServer();
-    Path oldWALsDir = new Path(TEST_UTIL.getDefaultRootDirPath(),
-        HConstants.HREGION_OLDLOGDIR_NAME);
+
     FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
-    LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, oldWALsDir);
+    LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR);
     assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE, cleaner.getSizeOfCleaners());
     assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
         cleaner.getCleanerThreadTimeoutMsec());
-    assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC,
-        cleaner.getCleanerThreadCheckIntervalMsec());
     // Create dir and files for test
-    fs.delete(oldWALsDir, true);
-    fs.mkdirs(oldWALsDir);
     int numOfFiles = 10;
-    createFiles(fs, oldWALsDir, numOfFiles);
-    FileStatus[] status = fs.listStatus(oldWALsDir);
+    createFiles(fs, OLD_WALS_DIR, numOfFiles);
+    FileStatus[] status = fs.listStatus(OLD_WALS_DIR);
     assertEquals(numOfFiles, status.length);
     // Start cleaner chore
     Thread thread = new Thread(() -> cleaner.chore());
@@ -308,31 +317,24 @@ public class TestLogsCleaner {
     // change size of cleaners dynamically
     int sizeToChange = 4;
     long threadTimeoutToChange = 30 * 1000L;
-    long threadCheckIntervalToChange = 250L;
     conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE, sizeToChange);
     conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, threadTimeoutToChange);
-    conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC,
-        threadCheckIntervalToChange);
     cleaner.onConfigurationChange(conf);
     assertEquals(sizeToChange, cleaner.getSizeOfCleaners());
     assertEquals(threadTimeoutToChange, cleaner.getCleanerThreadTimeoutMsec());
-    assertEquals(threadCheckIntervalToChange, cleaner.getCleanerThreadCheckIntervalMsec());
     // Stop chore
     thread.join();
-    status = fs.listStatus(oldWALsDir);
+    status = fs.listStatus(OLD_WALS_DIR);
     assertEquals(0, status.length);
   }
 
   private void createFiles(FileSystem fs, Path parentDir, int numOfFiles) throws IOException
{
-    Random random = new Random();
     for (int i = 0; i < numOfFiles; i++) {
-      int xMega = 1 + random.nextInt(3); // size of each file is between 1~3M
+      // size of each file is 1M, 2M, or 3M
+      int xMega = 1 + ThreadLocalRandom.current().nextInt(1, 4);
       try (FSDataOutputStream fsdos = fs.create(new Path(parentDir, "file-" + i))) {
-        for (int m = 0; m < xMega; m++) {
-          byte[] M = new byte[1024 * 1024];
-          random.nextBytes(M);
-          fsdos.write(M);
-        }
+        byte[] M = RandomUtils.nextBytes(Math.toIntExact(FileUtils.ONE_MB * xMega));
+        fsdos.write(M);
       }
     }
   }


Mime
View raw message