bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From f..@apache.org
Subject bookkeeper git commit: BOOKKEEPER-827 change throttle in GarbageCollector to use either "by entry" or "by byte" (Jia Zhai via fpj)
Date Fri, 16 Jan 2015 15:01:08 GMT
Repository: bookkeeper
Updated Branches:
  refs/heads/master 99b40326f -> 0d6a65d3b


BOOKKEEPER-827 change throttle in GarbageCollector to use either "by entry" or "by byte" (Jia
Zhai via fpj)


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/0d6a65d3
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/0d6a65d3
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/0d6a65d3

Branch: refs/heads/master
Commit: 0d6a65d3b14122f0c0a3a4e9d6a4fbe569165ec7
Parents: 99b4032
Author: fpj <fpj@apache.org>
Authored: Fri Jan 16 14:57:03 2015 +0000
Committer: fpj <fpj@apache.org>
Committed: Fri Jan 16 14:57:03 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +
 bookkeeper-server/conf/bk_server.conf           |  9 +++
 .../bookie/GarbageCollectorThread.java          | 38 ++++++++--
 .../bookkeeper/conf/ServerConfiguration.java    | 73 +++++++++++++++++++-
 .../bookkeeper/bookie/CompactionTest.java       | 18 ++++-
 5 files changed, 133 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0d6a65d3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 66df55b..a61fbb4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -46,6 +46,9 @@ Trunk (unreleased changes)
 
       BOOKKEEPER-634: Provide admin tool to rename bookie identifier in ledger metadata (rakeshr
via ivank)
 
+      BOOKKEEPER-827: change throttle in GarbageCollector to use 
+      either "by entry" or "by byte" (Jia Zhai via fpj)
+
       bookkeeper-client:
 
         BOOKKEEPER-810: Allow to configure TCP connect timeout (Charles Xie via sijie)

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0d6a65d3/bookkeeper-server/conf/bk_server.conf
----------------------------------------------------------------------
diff --git a/bookkeeper-server/conf/bk_server.conf b/bookkeeper-server/conf/bk_server.conf
index e38ea2b..53db095 100644
--- a/bookkeeper-server/conf/bk_server.conf
+++ b/bookkeeper-server/conf/bk_server.conf
@@ -109,6 +109,15 @@ ledgerDirectories=/tmp/bk-data
 # Set the rate at which compaction will readd entries. The unit is adds per second.
 #compactionRate=1000
 
+# Throttle compaction by bytes or by entries. 
+#isThrottleByBytes=false
+
+# Set the rate at which compaction will readd entries. The unit is adds per second.
+#compactionRateByEntries=1000
+
+# Set the rate at which compaction will readd entries. The unit is bytes added per second.
+#compactionRateByBytes=1000000
+
 # Max file size of journal file, in mega bytes
 # A new journal file will be created when the old one reaches the file size limitation
 #

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0d6a65d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index 74c6ec2..a1f044f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -69,8 +69,10 @@ public class GarbageCollectorThread extends BookieThread {
     long lastMinorCompactionTime;
     long lastMajorCompactionTime;
 
+    final boolean isThrottleByBytes;
     final int maxOutstandingRequests;
-    final int compactionRate;
+    final int compactionRateByEntries;
+    final int compactionRateByBytes;
     final CompactionScannerFactory scannerFactory;
 
     // Entry Logger Handle
@@ -106,6 +108,29 @@ public class GarbageCollectorThread extends BookieThread {
             this.offset = offset;
         }
     }
+ 
+    private static class Throttler {
+        final RateLimiter rateLimiter;
+        final boolean isThrottleByBytes;
+        final int compactionRateByBytes;
+        final int compactionRateByEntries;
+
+        Throttler(boolean isThrottleByBytes, 
+                  int compactionRateByBytes, 
+                  int compactionRateByEntries) {
+            this.isThrottleByBytes  = isThrottleByBytes;
+            this.compactionRateByBytes = compactionRateByBytes;
+            this.compactionRateByEntries = compactionRateByEntries;
+            this.rateLimiter = RateLimiter.create(this.isThrottleByBytes ? 
+                                                  this.compactionRateByBytes : 
+                                                  this.compactionRateByEntries);
+        }
+        
+        // acquire. if bybytes: bytes of this entry; if byentries: 1.
+        void acquire(int permits) {
+            rateLimiter.acquire(this.isThrottleByBytes ? permits : 1);
+        }
+    }
 
     /**
      * A scanner wrapper to check whether a ledger is alive in an entry log file
@@ -114,7 +139,10 @@ public class GarbageCollectorThread extends BookieThread {
         List<Offset> offsets = new ArrayList<Offset>();
 
         EntryLogScanner newScanner(final EntryLogMetadata meta) {
-            final RateLimiter rateLimiter = RateLimiter.create(compactionRate);
+            final Throttler throttler = new Throttler (isThrottleByBytes,
+                                                       compactionRateByBytes, 
+                                                       compactionRateByEntries);
+
             return new EntryLogScanner() {
                 @Override
                 public boolean accept(long ledgerId) {
@@ -124,7 +152,7 @@ public class GarbageCollectorThread extends BookieThread {
                 @Override
                 public void process(final long ledgerId, long offset, ByteBuffer entry)
                         throws IOException {
-                    rateLimiter.acquire();
+                    throttler.acquire(entry.remaining());
                     synchronized (CompactionScannerFactory.this) {
                         if (offsets.size() > maxOutstandingRequests) {
                             waitEntrylogFlushed();
@@ -207,8 +235,10 @@ public class GarbageCollectorThread extends BookieThread {
         this.activeLedgers = activeLedgers;
 
         this.gcWaitTime = conf.getGcWaitTime();
+        this.isThrottleByBytes = conf.getIsThrottleByBytes();
         this.maxOutstandingRequests = conf.getCompactionMaxOutstandingRequests();
-        this.compactionRate = conf.getCompactionRate();
+        this.compactionRateByEntries  = conf.getCompactionRateByEntries();
+        this.compactionRateByBytes = conf.getCompactionRateByBytes();
         this.scannerFactory = new CompactionScannerFactory();
         entryLogger.addListener(this.scannerFactory);
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0d6a65d3/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index a06e770..cd9f7a0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -38,9 +38,12 @@ public class ServerConfiguration extends AbstractConfiguration {
     protected final static String MINOR_COMPACTION_THRESHOLD = "minorCompactionThreshold";
     protected final static String MAJOR_COMPACTION_INTERVAL = "majorCompactionInterval";
     protected final static String MAJOR_COMPACTION_THRESHOLD = "majorCompactionThreshold";
+    protected final static String IS_THROTTLE_BY_BYTES = "isThrottleByBytes";
     protected final static String COMPACTION_MAX_OUTSTANDING_REQUESTS
         = "compactionMaxOutstandingRequests";
     protected final static String COMPACTION_RATE = "compactionRate";
+    protected final static String COMPACTION_RATE_BY_ENTRIES = "compactionRateByEntries";
+    protected final static String COMPACTION_RATE_BY_BYTES = "compactionRateByBytes";
 
     // Gc Parameters
     protected final static String GC_WAIT_TIME = "gcWaitTime";
@@ -1243,6 +1246,28 @@ public class ServerConfiguration extends AbstractConfiguration {
     }
 
     /**
+     * Get whether use bytes to throttle garbage collector compaction or not
+     *
+     * @return true  - use Bytes, 
+     *         false - use Entries.
+     */
+    public boolean getIsThrottleByBytes() {
+        return getBoolean(IS_THROTTLE_BY_BYTES, false);
+    }
+
+    /**
+     * Set whether use bytes to throttle garbage collector compaction or not
+     *
+     * @param byBytes true to use by bytes; false to use by entries
+     *
+     * @return ServerConfiguration
+     */
+    public ServerConfiguration setIsThrottleByBytes(boolean byBytes) {
+        setProperty(IS_THROTTLE_BY_BYTES, byBytes);
+        return this;
+    }
+
+    /**
      * Get the maximum number of entries which can be compacted without flushing.
      * Default is 100,000.
      *
@@ -1272,12 +1297,14 @@ public class ServerConfiguration extends AbstractConfiguration {
         setProperty(COMPACTION_MAX_OUTSTANDING_REQUESTS, maxOutstandingRequests);
         return this;
     }
-
+    
     /**
      * Get the rate of compaction adds. Default is 1,000.
      *
      * @return rate of compaction (adds per second)
+     * @deprecated  replaced by {@link #getCompactionRateByEntries()}
      */
+    @Deprecated
     public int getCompactionRate() {
         return getInt(COMPACTION_RATE, 1000);
     }
@@ -1285,7 +1312,7 @@ public class ServerConfiguration extends AbstractConfiguration {
     /**
      * Set the rate of compaction adds.
      *
-     * @param rate rate of compaction adds (adds per second)
+     * @param rate rate of compaction adds (adds entries per second)
      *
      * @return ServerConfiguration
      */
@@ -1295,6 +1322,48 @@ public class ServerConfiguration extends AbstractConfiguration {
     }
 
     /**
+     * Get the rate of compaction adds. Default is 1,000.
+     *
+     * @return rate of compaction (adds entries per second)
+     */
+    public int getCompactionRateByEntries() {
+        return getInt(COMPACTION_RATE_BY_ENTRIES, getCompactionRate());
+    }
+
+    /**
+     * Set the rate of compaction adds.
+     *
+     * @param rate rate of compaction adds (adds entries per second)
+     *
+     * @return ServerConfiguration
+     */
+    public ServerConfiguration setCompactionRateByEntries(int rate) {
+        setProperty(COMPACTION_RATE_BY_ENTRIES, rate);
+        return this;
+    }
+
+    /**
+     * Get the rate of compaction adds. Default is 1,000,000.
+     *
+     * @return rate of compaction (adds bytes per second)
+     */
+    public int getCompactionRateByBytes() {
+        return getInt(COMPACTION_RATE_BY_BYTES, 1000000);
+    }
+
+    /**
+     * Set the rate of compaction adds.
+     *
+     * @param rate rate of compaction adds (adds bytes per second)
+     *
+     * @return ServerConfiguration
+     */
+    public ServerConfiguration setCompactionRateByBytes(int rate) {
+        setProperty(COMPACTION_RATE_BY_BYTES, rate);
+        return this;
+    }
+
+    /**
      * Should we remove pages from page cache after force write
      *
      * @return remove pages from cache

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0d6a65d3/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index 956595e..6c9c4a7 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -29,6 +29,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.Collections;
 import java.util.Enumeration;
+import java.util.Arrays;
+import java.util.Collection;
 
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerEntry;
@@ -49,15 +51,25 @@ import org.apache.zookeeper.AsyncCallback;
 
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.junit.Assert.*;
-
 /**
  * This class tests the entry log compaction functionality.
  */
+@RunWith(Parameterized.class)
 public class CompactionTest extends BookKeeperClusterTestCase {
+    @Parameters
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {{true}, {false}});
+    }
+
+    private boolean isThrottleByBytes; 
+   
     private final static Logger LOG = LoggerFactory.getLogger(CompactionTest.class);
     DigestType digestType;
 
@@ -73,9 +85,10 @@ public class CompactionTest extends BookKeeperClusterTestCase {
 
     String msg;
 
-    public CompactionTest() {
+    public CompactionTest(boolean isByBytes) {
         super(NUM_BOOKIES);
 
+        this.isThrottleByBytes = isByBytes;
         this.digestType = DigestType.CRC32;
 
         numEntries = 100;
@@ -106,6 +119,7 @@ public class CompactionTest extends BookKeeperClusterTestCase {
         baseConf.setMajorCompactionInterval(majorCompactionInterval);
         baseConf.setEntryLogFilePreAllocationEnabled(false);
         baseConf.setSortedLedgerStorageEnabled(false);
+        baseConf.setIsThrottleByBytes(this.isThrottleByBytes);
 
         super.setUp();
     }


Mime
View raw message