hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject svn commit: r1470755 - in /hbase/branches/0.95/hbase-server/src: main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/compactions/ test/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hba...
Date Mon, 22 Apr 2013 23:45:40 GMT
Author: sershe
Date: Mon Apr 22 23:45:39 2013
New Revision: 1470755

URL: http://svn.apache.org/r1470755
Log:
HBASE-7437 Improve CompactSelection (Hiroshi Ikeda)

Added:
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CurrentHourProvider.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakHours.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestOffPeakHours.java
Removed:
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakCompactions.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestOffPeakCompactions.java
Modified:
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1470755&r1=1470754&r2=1470755&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
(original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
Mon Apr 22 23:45:39 2013
@@ -34,6 +34,7 @@ import java.util.concurrent.ExecutionExc
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -67,7 +68,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
-import org.apache.hadoop.hbase.regionserver.compactions.OffPeakCompactions;
+import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -146,7 +147,8 @@ public class HStore implements Store {
 
   final StoreEngine<?, ?, ?> storeEngine;
 
-  private OffPeakCompactions offPeakCompactions;
+  private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean();
+  private final OffPeakHours offPeakHours;
 
   private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
   private static int flush_retries_number;
@@ -198,7 +200,7 @@ public class HStore implements Store {
     // to clone it?
     scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
     this.memstore = new MemStore(conf, this.comparator);
-    this.offPeakCompactions = new OffPeakCompactions(conf);
+    this.offPeakHours = OffPeakHours.getInstance(conf);
 
     // Setting up cache configuration for this family
     this.cacheConf = new CacheConfig(conf, family);
@@ -1182,13 +1184,21 @@ public class HStore implements Store {
         // Normal case - coprocessor is not overriding file selection.
         if (!compaction.hasSelection()) {
           boolean isUserCompaction = priority == Store.PRIORITY_USER;
-          boolean mayUseOffPeak = this.offPeakCompactions.tryStartOffPeakRequest();
-          compaction.select(this.filesCompacting, isUserCompaction,
+          boolean mayUseOffPeak = offPeakHours.isOffPeakHour() &&
+              offPeakCompactionTracker.compareAndSet(false, true);
+          try {
+            compaction.select(this.filesCompacting, isUserCompaction,
               mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
+          } catch (IOException e) {
+            if (mayUseOffPeak) {
+              offPeakCompactionTracker.set(false);
+            }
+            throw e;
+          }
           assert compaction.hasSelection();
           if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {
             // Compaction policy doesn't want to take advantage of off-peak.
-            this.offPeakCompactions.endOffPeakRequest();
+            offPeakCompactionTracker.set(false);
           }
         }
         if (this.getCoprocessorHost() != null) {
@@ -1248,7 +1258,7 @@ public class HStore implements Store {
   private void finishCompactionRequest(CompactionRequest cr) {
     this.region.reportCompactionRequestEnd(cr.isMajor());
     if (cr.isOffPeak()) {
-      this.offPeakCompactions.endOffPeakRequest();
+      offPeakCompactionTracker.set(false);
       cr.setOffPeak(false);
     }
     synchronized (filesCompacting) {

Added: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CurrentHourProvider.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CurrentHourProvider.java?rev=1470755&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CurrentHourProvider.java
(added)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CurrentHourProvider.java
Mon Apr 22 23:45:39 2013
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class CurrentHourProvider {
+  private CurrentHourProvider() { throw new AssertionError(); }
+
+  private static final class Tick {
+    final int currentHour;
+    final long expirationTimeInMillis;
+
+    Tick(int currentHour, long expirationTimeInMillis) {
+      this.currentHour = currentHour;
+      this.expirationTimeInMillis = expirationTimeInMillis;
+    }
+  }
+
+  private static Tick nextTick() {
+    Calendar calendar = new GregorianCalendar();
+    int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
+    moveToNextHour(calendar);
+    return new Tick(currentHour, calendar.getTimeInMillis());
+  }
+
+  private static void moveToNextHour(Calendar calendar) {
+    calendar.add(Calendar.HOUR_OF_DAY, 1);
+    calendar.set(Calendar.MINUTE, 0);
+    calendar.set(Calendar.SECOND, 0);
+    calendar.set(Calendar.MILLISECOND, 0);
+  }
+
+  private static volatile Tick tick = nextTick();
+
+  public static int getCurrentHour() {
+    Tick tick = CurrentHourProvider.tick;
+    if(System.currentTimeMillis() < tick.expirationTimeInMillis) {
+      return tick.currentHour;
+    }
+
+    CurrentHourProvider.tick = tick = nextTick();
+    return tick.currentHour;
+  }
+}

Added: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakHours.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakHours.java?rev=1470755&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakHours.java
(added)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakHours.java
Mon Apr 22 23:45:39 2013
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+
+@InterfaceAudience.Private
+public abstract class OffPeakHours {
+  private static final Log LOG = LogFactory.getLog(OffPeakHours.class);
+
+  public static final OffPeakHours DISABLED = new OffPeakHours() {
+    @Override public boolean isOffPeakHour() { return false; }
+    @Override public boolean isOffPeakHour(int targetHour) { return false; }
+  };
+
+  public static OffPeakHours getInstance(Configuration conf) {
+    int startHour = conf.getInt("hbase.offpeak.start.hour", -1);
+    int endHour = conf.getInt("hbase.offpeak.end.hour", -1);
+    return getInstance(startHour, endHour);
+  }
+
+  /**
+   * @param startHour inclusive
+   * @param endHour exclusive
+   */
+  public static OffPeakHours getInstance(int startHour, int endHour) {
+    if (startHour == -1 && endHour == -1) {
+      return DISABLED;
+    }
+
+    if (! isValidHour(startHour) || ! isValidHour(endHour)) {
+      if (LOG.isWarnEnabled()) {
+        LOG.warn("Ignoring invalid start/end hour for peak hour : start = " +
+            startHour + " end = " + endHour +
+            ". Valid numbers are [0-23]");
+      }
+      return DISABLED;
+    }
+
+    if (startHour == endHour) {
+      return DISABLED;
+    }
+
+    return new OffPeakHoursImpl(startHour, endHour);
+  }
+
+  private static boolean isValidHour(int hour) {
+    return 0 <= hour && hour <= 23;
+  }
+
+  /**
+   * @return whether {@code targetHour} is off-peak hour
+   */
+  public abstract boolean isOffPeakHour(int targetHour);
+
+  /**
+   * @return whether it is off-peak hour
+   */
+  public abstract boolean isOffPeakHour();
+
+  private static class OffPeakHoursImpl extends OffPeakHours {
+    final int startHour;
+    final int endHour;
+
+    /**
+     * @param startHour inclusive
+     * @param endHour exclusive
+     */
+    OffPeakHoursImpl(int startHour, int endHour) {
+      this.startHour = startHour;
+      this.endHour = endHour;
+    }
+
+    @Override
+    public boolean isOffPeakHour() {
+      return isOffPeakHour(CurrentHourProvider.getCurrentHour());
+    }
+
+    @Override
+    public boolean isOffPeakHour(int targetHour) {
+      if (startHour <= endHour) {
+        return startHour <= targetHour && targetHour < endHour;
+      }
+      return targetHour < endHour || startHour <= targetHour;
+    }
+  }
+}

Added: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestOffPeakHours.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestOffPeakHours.java?rev=1470755&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestOffPeakHours.java
(added)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestOffPeakHours.java
Mon Apr 22 23:45:39 2013
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.SmallTests;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestOffPeakHours {
+  private static HBaseTestingUtility testUtil;
+
+  @BeforeClass
+  public static void setUpClass() {
+    testUtil = new HBaseTestingUtility();
+  }
+
+  private int hourOfDay;
+  private int hourPlusOne;
+  private int hourMinusOne;
+  private int hourMinusTwo;
+  private Configuration conf;
+
+  @Before
+  public void setUp() {
+    hourOfDay = 15;
+    hourPlusOne = ((hourOfDay+1)%24);
+    hourMinusOne = ((hourOfDay-1+24)%24);
+    hourMinusTwo = ((hourOfDay-2+24)%24);
+    conf = testUtil.getConfiguration();
+  }
+
+  @Test
+  public void testWithoutSettings() {
+    Configuration conf = testUtil.getConfiguration();
+    OffPeakHours target = OffPeakHours.getInstance(conf);
+    assertFalse(target.isOffPeakHour(hourOfDay));
+  }
+
+  @Test
+  public void testSetPeakHourToTargetTime() {
+    conf.setLong("hbase.offpeak.start.hour", hourMinusOne);
+    conf.setLong("hbase.offpeak.end.hour", hourPlusOne);
+    OffPeakHours target = OffPeakHours.getInstance(conf);
+    assertTrue(target.isOffPeakHour(hourOfDay));
+  }
+
+  @Test
+  public void testSetPeakHourOutsideCurrentSelection() {
+    conf.setLong("hbase.offpeak.start.hour", hourMinusTwo);
+    conf.setLong("hbase.offpeak.end.hour", hourMinusOne);
+    OffPeakHours target = OffPeakHours.getInstance(conf);
+    assertFalse(target.isOffPeakHour(hourOfDay));
+  }
+}



Mime
View raw message