hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject svn commit: r1501499 - in /hbase/branches/0.95/hbase-server/src: main/java/org/apache/hadoop/hbase/regionserver/compactions/ test/java/org/apache/hadoop/hbase/regionserver/compactions/
Date Tue, 09 Jul 2013 19:51:36 GMT
Author: sershe
Date: Tue Jul  9 19:51:36 2013
New Revision: 1501499

URL: http://svn.apache.org/r1501499
Log:
HBASE-8329 Limit compaction speed (binlijin)

Added:
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/PeakCompactionsThrottle.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestPeakCompactionsThrottle.java
Modified:
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakHours.java

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java?rev=1501499&r1=1501498&r2=1501499&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
(original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
Tue Jul  9 19:51:36 2013
@@ -58,6 +58,7 @@ public abstract class Compactor {
 
   private int compactionKVMax;
   protected Compression.Algorithm compactionCompression;
+  private PeakCompactionsThrottle peakCompactionsThrottle;
 
   //TODO: depending on Store is not good but, realistically, all compactors currently do.
   Compactor(final Configuration conf, final Store store) {
@@ -66,6 +67,7 @@ public abstract class Compactor {
     this.compactionKVMax = this.conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
     this.compactionCompression = (this.store.getFamily() == null) ?
         Compression.Algorithm.NONE : this.store.getFamily().getCompactionCompression();
+    this.peakCompactionsThrottle = new PeakCompactionsThrottle(conf);
   }
 
   /**
@@ -201,6 +203,7 @@ public abstract class Compactor {
     // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
     int closeCheckInterval = HStore.getCloseCheckInterval();
     boolean hasMore;
+    peakCompactionsThrottle.startCompaction();
     do {
       hasMore = scanner.next(kvs, compactionKVMax);
       // output to writer:
@@ -222,9 +225,12 @@ public abstract class Compactor {
             }
           }
         }
+        peakCompactionsThrottle.throttle(kv.getLength());
       }
       kvs.clear();
     } while (hasMore);
+    peakCompactionsThrottle.finishCompaction(this.store.getRegionInfo()
+        .getRegionNameAsString(), this.store.getFamily().getNameAsString());
     progress.complete();
     return true;
   }

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakHours.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakHours.java?rev=1501499&r1=1501498&r2=1501499&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakHours.java
(original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakHours.java
Tue Jul  9 19:51:36 2013
@@ -37,6 +37,13 @@ public abstract class OffPeakHours {
     return getInstance(startHour, endHour);
   }
 
+  public static OffPeakHours getInstance(Configuration conf, String start,
+      String end) {
+    int startHour = conf.getInt(start, -1);
+    int endHour = conf.getInt(end, -1);
+    return getInstance(startHour, endHour);
+  }
+
   /**
    * @param startHour inclusive
    * @param endHour exclusive

Added: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/PeakCompactionsThrottle.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/PeakCompactionsThrottle.java?rev=1501499&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/PeakCompactionsThrottle.java
(added)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/PeakCompactionsThrottle.java
Tue Jul  9 19:51:36 2013
@@ -0,0 +1,138 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * The class used to track peak hours and compactions. peak compaction speed
+ * should be limit.
+ * 
+ */
+@InterfaceAudience.Private
+public class PeakCompactionsThrottle {
+  private static final Log LOG = LogFactory.getLog(PeakCompactionsThrottle.class);
+
+  public static final String PEAK_START_HOUR = "hbase.peak.start.hour";
+  public static final String PEAK_END_HOUR = "hbase.peak.end.hour";
+  public static final String PEAK_COMPACTION_SPEED_ALLOWED =
+      "hbase.regionserver.compaction.peak.maxspeed";
+  public static final String PEAK_COMPACTION_SPEED_CHECK_INTERVAL =
+      "hbase.regionserver.compaction.speed.check.interval";
+
+  OffPeakHours offPeakHours;
+  private long start;
+  private long end;
+  private long maxSpeedInPeak;
+  private int sleepNumber = 0;
+  private int sleepTimeTotal = 0;
+  int bytesWritten = 0;
+  int checkInterval = 0;
+
+  public PeakCompactionsThrottle(Configuration conf) {
+    offPeakHours = OffPeakHours.getInstance(conf, PEAK_START_HOUR, PEAK_END_HOUR);
+    maxSpeedInPeak = conf.getInt(PEAK_COMPACTION_SPEED_ALLOWED, 30 * 1024 * 1024 /* 30 MB/s
*/);
+    checkInterval = conf.getInt(PEAK_COMPACTION_SPEED_CHECK_INTERVAL, 30 * 1024 * 1024 /*
30 MB */);
+  }
+
+  /**
+   * start compaction
+   */
+  public void startCompaction() {
+    start = System.currentTimeMillis();
+  }
+
+  /**
+   * finish compaction
+   */
+  public void finishCompaction(String region, String family) {
+    if (sleepNumber > 0) {
+      LOG.info("Region '" + region + "' family '" + family + "' 's maxSpeedInPeak is "
+          + StringUtils.humanReadableInt(maxSpeedInPeak) + "/s compaction throttle: sleep
number  "
+          + sleepNumber + " sleep time " + sleepTimeTotal + "(ms)");
+    }
+  }
+
+  /**
+   * reset start time
+   */
+  void resetStartTime() {
+    start = System.currentTimeMillis();
+  }
+
+  /**
+   * Peak compaction throttle, if it is peak time and the compaction speed is too fast, sleep
for a
+   * while to slow down.
+   */
+  public void throttle(long numOfBytes) throws IOException {
+    bytesWritten += numOfBytes;
+    if (bytesWritten >= checkInterval) {
+      checkAndSlowFastCompact(bytesWritten);
+      bytesWritten = 0;
+    }
+  }
+
+  private void checkAndSlowFastCompact(long numOfBytes) throws IOException {
+    if (!isPeakHour()) {
+      // not peak hour, just return.
+      return;
+    }
+    if (maxSpeedInPeak <= 0) {
+      return;
+    }
+    end = System.currentTimeMillis();
+    long minTimeAllowed = numOfBytes * 1000 / maxSpeedInPeak; // ms
+    long elapsed = end - start;
+    if (elapsed < minTimeAllowed) {
+      // too fast
+      try {
+        // sleep for a while to slow down.
+        Thread.sleep(minTimeAllowed - elapsed);
+        sleepNumber++;
+        sleepTimeTotal += (minTimeAllowed - elapsed);
+      } catch (InterruptedException ie) {
+        IOException iie = new InterruptedIOException();
+        iie.initCause(ie);
+        throw iie;
+      }
+    }
+    resetStartTime();
+  }
+
+  /**
+   * @return whether this is peak hour
+   */
+  private boolean isPeakHour() {
+    return offPeakHours.isOffPeakHour();
+  }
+
+  /**
+   * For test
+   */
+  public int getSleepNumber() {
+    return sleepNumber;
+  }
+}

Added: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestPeakCompactionsThrottle.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestPeakCompactionsThrottle.java?rev=1501499&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestPeakCompactionsThrottle.java
(added)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestPeakCompactionsThrottle.java
Tue Jul  9 19:51:36 2013
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.SmallTests;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestPeakCompactionsThrottle {
+  private static HBaseTestingUtility testUtil;
+  private Configuration conf;
+
+  @BeforeClass
+  public static void setUpClass() {
+    testUtil = new HBaseTestingUtility();
+  }
+
+  @Before
+  public void setUp() {
+    conf = testUtil.getConfiguration();
+  }
+
+  @Test
+  public void testSetPeakHourToTargetTime() throws IOException {
+    conf.set(PeakCompactionsThrottle.PEAK_START_HOUR, "0");
+    conf.set(PeakCompactionsThrottle.PEAK_END_HOUR, "23");
+    PeakCompactionsThrottle peakCompactionsThrottle = new PeakCompactionsThrottle(conf);
+    peakCompactionsThrottle.startCompaction();
+    long numOfBytes = 60 * 1024 * 1024;
+    peakCompactionsThrottle.throttle(numOfBytes);
+    peakCompactionsThrottle.finishCompaction("region", "family");
+    assertTrue(peakCompactionsThrottle.getSleepNumber() > 0);
+  }
+
+  @Test
+  public void testSetPeakHourOutsideCurrentSelection() throws IOException {
+    conf.set(PeakCompactionsThrottle.PEAK_START_HOUR, "-1");
+    conf.set(PeakCompactionsThrottle.PEAK_END_HOUR, "-1");
+    PeakCompactionsThrottle peakCompactionsThrottle = new PeakCompactionsThrottle(conf);
+    peakCompactionsThrottle.startCompaction();
+    long numOfBytes = 30 * 1024 * 1024;
+    peakCompactionsThrottle.throttle(numOfBytes);
+    peakCompactionsThrottle.finishCompaction("region", "family");
+    assertTrue(peakCompactionsThrottle.getSleepNumber() == 0);
+  }
+}



Mime
View raw message