bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject bookkeeper git commit: BOOKKEEPER-1004: Allow bookie garbage collection to be triggered manu…
Date Tue, 28 Mar 2017 20:39:24 GMT
Repository: bookkeeper
Updated Branches:
  refs/heads/master bf8ef14bc -> 825e0e7b4


BOOKKEEPER-1004: Allow bookie garbage collection to be triggered manu…

Ran CompactionTest#testForceGarbageCollection

Author: Govind Menon <govindappumenon@gmail.com>

Reviewers: Enrico Olivelli <eolivelli@gmail.com>

Closes #109 from govind-menon/BOOKKEEPER-1004


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/825e0e7b
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/825e0e7b
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/825e0e7b

Branch: refs/heads/master
Commit: 825e0e7b4a27730d85afbe823a405ba333de3a65
Parents: bf8ef14
Author: Govind Menon <govindappumenon@gmail.com>
Authored: Tue Mar 28 13:39:22 2017 -0700
Committer: Sijie Guo <sijie@apache.org>
Committed: Tue Mar 28 13:39:22 2017 -0700

----------------------------------------------------------------------
 .../bookie/GarbageCollectorThread.java          | 136 +++++++++++--------
 .../bookkeeper/bookie/BookieAccessor.java       |   1 +
 .../bookkeeper/bookie/CompactionTest.java       |   4 +-
 3 files changed, 80 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/825e0e7b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index aa42475..f4b35f8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -29,16 +29,22 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.RateLimiter;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
 import org.apache.bookkeeper.bookie.GarbageCollector.GarbageCleaner;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.util.SafeRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,13 +52,16 @@ import org.slf4j.LoggerFactory;
  * This is the garbage collector thread that runs in the background to
  * remove any entry log files that no longer contains any active ledger.
  */
-public class GarbageCollectorThread extends BookieThread {
+public class GarbageCollectorThread extends SafeRunnable {
     private static final Logger LOG = LoggerFactory.getLogger(GarbageCollectorThread.class);
     private static final int SECOND = 1000;
 
     // Maps entry log files to the set of ledgers that comprise the file and the size usage
per ledger
     private Map<Long, EntryLogMetadata> entryLogMetaMap = new ConcurrentHashMap<Long,
EntryLogMetadata>();
 
+    ScheduledExecutorService gcExecutor;
+    Future<?> scheduledFuture = null;
+
     // This is how often we want to run the Garbage Collector Thread (in milliseconds).
     final long gcWaitTime;
 
@@ -188,7 +197,9 @@ public class GarbageCollectorThread extends BookieThread {
                                   LedgerManager ledgerManager,
                                   final CompactableLedgerStorage ledgerStorage)
         throws IOException {
-        super("GarbageCollectorThread");
+        gcExecutor = Executors.newSingleThreadScheduledExecutor(
+                new ThreadFactoryBuilder().setNameFormat("GarbageCollectorThread-%d").build()
+        );
 
         this.entryLogger = ledgerStorage.getEntryLogger();
         this.ledgerStorage = ledgerStorage;
@@ -266,10 +277,10 @@ public class GarbageCollectorThread extends BookieThread {
         lastMinorCompactionTime = lastMajorCompactionTime = MathUtils.now();
     }
 
-    public synchronized void enableForceGC() {
+    public void enableForceGC() {
         if (forceGarbageCollection.compareAndSet(false, true)) {
             LOG.info("Forced garbage collection triggered by thread: {}", Thread.currentThread().getName());
-            notify();
+            triggerGC();
         }
     }
 
@@ -280,6 +291,13 @@ public class GarbageCollectorThread extends BookieThread {
         }
     }
 
+    /**
+     * Manually trigger GC (for testing)
+     */
+    Future<?> triggerGC() {
+        return gcExecutor.submit(this);
+    }
+
     public void suspendMajorGC() {
         if (suspendMajorCompaction.compareAndSet(false, true)) {
             LOG.info("Suspend Major Compaction triggered by thread: {}", Thread.currentThread().getName());
@@ -304,64 +322,60 @@ public class GarbageCollectorThread extends BookieThread {
         }
     }
 
-    @Override
-    public void run() {
-        while (running) {
-            synchronized (this) {
-                try {
-                    wait(gcWaitTime);
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    continue;
-                }
-            }
-
-            boolean force = forceGarbageCollection.get();
-            if (force) {
-                LOG.info("Garbage collector thread forced to perform GC before expiry of
wait time.");
-            }
+    public void start() {
+        if (scheduledFuture != null) {
+            scheduledFuture.cancel(false);
+        }
+        scheduledFuture = gcExecutor.scheduleAtFixedRate(this, gcWaitTime, gcWaitTime, TimeUnit.MILLISECONDS);
+    }
 
-            // Extract all of the ledger ID's that comprise all of the entry logs
-            // (except for the current new one which is still being written to).
-            entryLogMetaMap = extractMetaFromEntryLogs(entryLogMetaMap);
+    @Override
+    public void safeRun() {
+        boolean force = forceGarbageCollection.get();
+        if (force) {
+            LOG.info("Garbage collector thread forced to perform GC before expiry of wait
time.");
+        }
 
-            // gc inactive/deleted ledgers
-            doGcLedgers();
+        // Extract all of the ledger ID's that comprise all of the entry logs
+        // (except for the current new one which is still being written to).
+        entryLogMetaMap = extractMetaFromEntryLogs(entryLogMetaMap);
 
-            // gc entry logs
-            doGcEntryLogs();
+        // gc inactive/deleted ledgers
+        doGcLedgers();
 
-            boolean suspendMajor = suspendMajorCompaction.get();
-            boolean suspendMinor = suspendMinorCompaction.get();
-            if (suspendMajor) {
-                LOG.info("Disk almost full, suspend major compaction to slow down filling
disk.");
-            }
-            if (suspendMinor) {
-                LOG.info("Disk full, suspend minor compaction to slow down filling disk.");
-            }
+        // gc entry logs
+        doGcEntryLogs();
 
-            long curTime = MathUtils.now();
-            if (enableMajorCompaction && (!suspendMajor) &&
-                (force || curTime - lastMajorCompactionTime > majorCompactionInterval))
{
-                // enter major compaction
-                LOG.info("Enter major compaction, suspendMajor {}", suspendMajor);
-                doCompactEntryLogs(majorCompactionThreshold);
-                lastMajorCompactionTime = MathUtils.now();
-                // also move minor compaction time
-                lastMinorCompactionTime = lastMajorCompactionTime;
-                continue;
-            }
+        boolean suspendMajor = suspendMajorCompaction.get();
+        boolean suspendMinor = suspendMinorCompaction.get();
+        if (suspendMajor) {
+            LOG.info("Disk almost full, suspend major compaction to slow down filling disk.");
+        }
+        if (suspendMinor) {
+            LOG.info("Disk full, suspend minor compaction to slow down filling disk.");
+        }
 
-            if (enableMinorCompaction && (!suspendMinor) &&
-                (force || curTime - lastMinorCompactionTime > minorCompactionInterval))
{
-                // enter minor compaction
-                LOG.info("Enter minor compaction, suspendMinor {}", suspendMinor);
-                doCompactEntryLogs(minorCompactionThreshold);
-                lastMinorCompactionTime = MathUtils.now();
-            }
+        long curTime = MathUtils.now();
+        if (enableMajorCompaction && (!suspendMajor) &&
+            (force || curTime - lastMajorCompactionTime > majorCompactionInterval)) {
+            // enter major compaction
+            LOG.info("Enter major compaction, suspendMajor {}", suspendMajor);
+            doCompactEntryLogs(majorCompactionThreshold);
+            lastMajorCompactionTime = MathUtils.now();
+            // and also move minor compaction time
+            lastMinorCompactionTime = lastMajorCompactionTime;
             forceGarbageCollection.set(false);
+            return;
+        }
+
+        if (enableMinorCompaction && (!suspendMinor) &&
+            (force || curTime - lastMinorCompactionTime > minorCompactionInterval)) {
+            // enter minor compaction
+            LOG.info("Enter minor compaction, suspendMinor {}", suspendMinor);
+            doCompactEntryLogs(minorCompactionThreshold);
+            lastMinorCompactionTime = MathUtils.now();
         }
-        LOG.info("GarbageCollectorThread exited loop!");
+        forceGarbageCollection.set(false);
     }
 
     /**
@@ -468,12 +482,16 @@ public class GarbageCollectorThread extends BookieThread {
     public void shutdown() throws InterruptedException {
         this.running = false;
         LOG.info("Shutting down GarbageCollectorThread");
-        if (compacting.compareAndSet(false, true)) {
-            // if setting compacting flag succeed, means gcThread is not compacting now
-            // it is safe to interrupt itself now
-            this.interrupt();
+
+        while (!compacting.compareAndSet(false, true)) {
+            // Wait till the thread stops compacting
+            Thread.sleep(100);
+        }
+        gcExecutor.shutdown();
+        if (gcExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
+            LOG.warn("GC executor did not shut down in 60 seconds. Killing");
+            gcExecutor.shutdownNow();
         }
-        this.join();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/825e0e7b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieAccessor.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieAccessor.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieAccessor.java
index f49f8ae..07d810e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieAccessor.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieAccessor.java
@@ -21,6 +21,7 @@
 package org.apache.bookkeeper.bookie;
 
 import java.io.IOException;
+import java.util.concurrent.Future;
 
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/825e0e7b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index ed881f1..6ae5e60 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -232,9 +232,9 @@ public class CompactionTest extends BookKeeperClusterTestCase {
                 dirManager, dirManager, cp, NullStatsLogger.INSTANCE);
         storage.start();
         long startTime = MathUtils.now();
-        Thread.sleep(2000);
         storage.gcThread.enableForceGC();
-        Thread.sleep(1000);
+        storage.gcThread.triggerGC().get(); //major
+        storage.gcThread.triggerGC().get(); //minor
         // Minor and Major compaction times should be larger than when we started
         // this test.
         assertTrue("Minor or major compaction did not trigger even on forcing.",


Mime
View raw message