hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject svn commit: r1445696 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/compactions/ test/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regio...
Date Wed, 13 Feb 2013 15:56:18 GMT
Author: tedyu
Date: Wed Feb 13 15:56:18 2013
New Revision: 1445696

URL: http://svn.apache.org/r1445696
Log:
HBASE-7822 clean up compactionrequest and compactselection - part 1 (Sergey)


Added:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakCompactions.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestOffPeakCompactions.java
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionState.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/PerfTestCompactionPolicies.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1445696&r1=1445695&r2=1445696&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Wed Feb 13 15:56:18 2013
@@ -111,6 +111,7 @@ import org.apache.hadoop.hbase.ipc.RpcCa
 import org.apache.hadoop.hbase.ipc.UnknownProtocolException;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@@ -294,6 +295,9 @@ public class HRegion implements HeapSize
    */
   private boolean isLoadingCfsOnDemandDefault = false;
 
+  private final AtomicInteger majorInProgress = new AtomicInteger(0);
+  private final AtomicInteger minorInProgress = new AtomicInteger(0);
+
   /**
    * @return The smallest mvcc readPoint across all the scanners in this
    * region. Writes older than this readPoint, are included  in every
@@ -5028,7 +5032,7 @@ public class HRegion implements HeapSize
   public static final long FIXED_OVERHEAD = ClassSize.align(
       ClassSize.OBJECT +
       ClassSize.ARRAY +
-      39 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
+      41 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
       (10 * Bytes.SIZEOF_LONG) +
       Bytes.SIZEOF_BOOLEAN);
 
@@ -5500,6 +5504,24 @@ public class HRegion implements HeapSize
   }
 
   /**
+   * @return if a given region is in compaction now.
+   */
+  public CompactionState getCompactionState() {
+    boolean hasMajor = majorInProgress.get() > 0, hasMinor = minorInProgress.get() >
0;
+    return (hasMajor ? (hasMinor ? CompactionState.MAJOR_AND_MINOR : CompactionState.MAJOR)
+        : (hasMinor ? CompactionState.MINOR : CompactionState.NONE));
+  }
+
+  public void reportCompactionRequestStart(boolean isMajor){
+    (isMajor ? majorInProgress : minorInProgress).incrementAndGet();
+  }
+
+  public void reportCompactionRequestEnd(boolean isMajor){
+    int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet();
+    assert newValue >= 0;
+  }
+
+  /**
    * Listener class to enable callers of
    * bulkLoadHFile() to perform any necessary
    * pre/post processing of a given bulkload call

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1445696&r1=1445695&r2=1445696&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Wed Feb 13 15:56:18 2013
@@ -3303,8 +3303,7 @@ public class HRegionServer implements Cl
       GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
       builder.setRegionInfo(HRegionInfo.convert(info));
       if (request.hasCompactionState() && request.getCompactionState()) {
-        builder.setCompactionState(
-          CompactionRequest.getCompactionState(info.getRegionId()));
+        builder.setCompactionState(region.getCompactionState());
       }
       return builder.build();
     } catch (IOException ie) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1445696&r1=1445695&r2=1445696&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
Wed Feb 13 15:56:18 2013
@@ -70,6 +70,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
+import org.apache.hadoop.hbase.regionserver.compactions.OffPeakCompactions;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -153,6 +154,8 @@ public class HStore implements Store {
   private final KeyValue.KVComparator comparator;
 
   private Compactor compactor;
+  
+  private OffPeakCompactions offPeakCompactions;
 
   private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
   private static int flush_retries_number;
@@ -207,11 +210,11 @@ public class HStore implements Store {
     // to clone it?
     scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
     this.memstore = new MemStore(conf, this.comparator);
+    this.offPeakCompactions = new OffPeakCompactions(conf);
 
     // Setting up cache configuration for this family
     this.cacheConf = new CacheConfig(conf, family);
 
-
     this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
 
     if (HStore.closeCheckInterval == 0) {
@@ -1242,8 +1245,13 @@ public class HStore implements Store {
           filesToCompact = new CompactSelection(candidates);
         } else {
           boolean isUserCompaction = priority == Store.PRIORITY_USER;
+          boolean mayUseOffPeak = this.offPeakCompactions.tryStartOffPeakRequest();
           filesToCompact = compactionPolicy.selectCompaction(candidates, isUserCompaction,
-              forceMajor && filesCompacting.isEmpty());
+              mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
+          if (mayUseOffPeak && !filesToCompact.isOffPeakCompaction()) {
+            // Compaction policy doesn't want to do anything with off-peak.
+            this.offPeakCompactions.endOffPeakRequest();
+          }
         }
 
         if (region.getCoprocessorHost() != null) {
@@ -1284,14 +1292,17 @@ public class HStore implements Store {
       this.lock.readLock().unlock();
     }
     if (ret != null) {
-      CompactionRequest.preRequest(ret);
+      this.region.reportCompactionRequestStart(ret.isMajor());
     }
     return ret;
   }
 
   public void finishRequest(CompactionRequest cr) {
-    CompactionRequest.postRequest(cr);
-    cr.finishRequest();
+    this.region.reportCompactionRequestEnd(cr.isMajor());
+    if (cr.getCompactSelection().isOffPeakCompaction()) {
+      this.offPeakCompactions.endOffPeakRequest();
+      cr.getCompactSelection().setOffPeak(false);
+    }
     synchronized (filesCompacting) {
       filesCompacting.removeAll(cr.getFiles());
     }
@@ -1868,7 +1879,7 @@ public class HStore implements Store {
   }
 
   public static final long FIXED_OVERHEAD =
-      ClassSize.align((20 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG)
+      ClassSize.align((21 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG)
               + (2 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
 
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java?rev=1445696&r1=1445695&r2=1445696&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java
Wed Feb 13 15:56:18 2013
@@ -31,21 +31,8 @@ import org.apache.hadoop.hbase.util.Envi
 public class CompactSelection {
   private static final long serialVersionUID = 1L;
   static final Log LOG = LogFactory.getLog(CompactSelection.class);
-  // the actual list - this is needed to handle methods like "sublist"
-  // correctly
+  // the actual list - this is needed to handle methods like "sublist" correctly
   List<StoreFile> filesToCompact = new ArrayList<StoreFile>();
-
-  /**
-   * Number of off peak compactions either in the compaction queue or
-   * happening now. Please lock compactionCountLock before modifying.
-   */
-  static long numOutstandingOffPeakCompactions = 0;
-
-  /**
-   * Lock object for numOutstandingOffPeakCompactions
-   */
-  private final static Object compactionCountLock = new Object();
-
   // was this compaction promoted to an off-peak
   boolean isOffPeakCompaction = false;
   // CompactSelection object creation time.
@@ -57,23 +44,6 @@ public class CompactSelection {
     this.isOffPeakCompaction = false;
   }
 
-  /**
-   * The current compaction finished, so reset the off peak compactions count
-   * if this was an off peak compaction.
-   */
-  public void finishRequest() {
-    if (isOffPeakCompaction) {
-      long newValueToLog = -1;
-      synchronized(compactionCountLock) {
-        assert !isOffPeakCompaction : "Double-counting off-peak count for compaction";
-        newValueToLog = --numOutstandingOffPeakCompactions;
-        isOffPeakCompaction = false;
-      }
-      LOG.info("Compaction done, numOutstandingOffPeakCompactions is now " +
-          newValueToLog);
-    }
-  }
-
   public List<StoreFile> getFilesToCompact() {
     return filesToCompact;
   }
@@ -84,42 +54,14 @@ public class CompactSelection {
    */
   public void emptyFileList() {
     filesToCompact.clear();
-    if (isOffPeakCompaction) {
-      long newValueToLog = -1;
-      synchronized(compactionCountLock) {
-        // reset the off peak count
-        newValueToLog = --numOutstandingOffPeakCompactions;
-        isOffPeakCompaction = false;
-      }
-      LOG.info("Nothing to compact, numOutstandingOffPeakCompactions is now " +
-          newValueToLog);
-    }
   }
 
   public boolean isOffPeakCompaction() {
     return this.isOffPeakCompaction;
   }
 
-  public static long getNumOutStandingOffPeakCompactions() {
-    synchronized(compactionCountLock) {
-      return numOutstandingOffPeakCompactions;
-    }
-  }
-
-  /**
-   * Tries making the compaction off-peak.
-   * Only checks internal compaction constraints, not timing.
-   * @return Eventual value of isOffPeakCompaction.
-   */
-  public boolean trySetOffpeak() {
-    assert !isOffPeakCompaction : "Double-setting off-peak for compaction " + this;
-    synchronized(compactionCountLock) {
-      if (numOutstandingOffPeakCompactions == 0) {
-         numOutstandingOffPeakCompactions++;
-         isOffPeakCompaction = true;
-      }
-    }
-    return isOffPeakCompaction;
+  public void setOffPeak(boolean value) {
+    this.isOffPeakCompaction = value;
   }
 
   public long getSelectionTime() {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java?rev=1445696&r1=1445695&r2=1445696&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
Wed Feb 13 15:56:18 2013
@@ -58,8 +58,6 @@ public class CompactionConfiguration {
   int maxFilesToCompact;
   double compactionRatio;
   double offPeekCompactionRatio;
-  int offPeakStartHour;
-  int offPeakEndHour;
   long throttlePoint;
   boolean shouldDeleteExpired;
   long majorCompactionPeriod;
@@ -78,17 +76,6 @@ public class CompactionConfiguration {
     maxFilesToCompact = conf.getInt(CONFIG_PREFIX + "max", 10);
     compactionRatio = conf.getFloat(CONFIG_PREFIX + "ratio", 1.2F);
     offPeekCompactionRatio = conf.getFloat(CONFIG_PREFIX + "ratio.offpeak", 5.0F);
-    offPeakStartHour = conf.getInt("hbase.offpeak.start.hour", -1);
-    offPeakEndHour = conf.getInt("hbase.offpeak.end.hour", -1);
-
-    if (!isValidHour(offPeakStartHour) || !isValidHour(offPeakEndHour)) {
-      if (!(offPeakStartHour == -1 && offPeakEndHour == -1)) {
-        LOG.warn("Ignoring invalid start/end hour for peak hour : start = " +
-            this.offPeakStartHour + " end = " + this.offPeakEndHour +
-            ". Valid numbers are [0-23]");
-      }
-      this.offPeakStartHour = this.offPeakEndHour = -1;
-    }
 
     throttlePoint =  conf.getLong("hbase.regionserver.thread.compaction.throttle",
           2 * maxFilesToCompact * storeConfigInfo.getMemstoreFlushSize());
@@ -104,16 +91,14 @@ public class CompactionConfiguration {
   @Override
   public String toString() {
     return String.format(
-      "size [%d, %d); files [%d, %d); ratio %f; off-peak ratio %f; off-peak hours %d-%d;
"
-      + "throttle point %d;%s delete expired; major period %d, major jitter %f",
+      "size [%d, %d); files [%d, %d); ratio %f; off-peak ratio %f; throttle point %d;"
+      + "%s delete expired; major period %d, major jitter %f",
       minCompactSize,
       maxCompactSize,
       minFilesToCompact,
       maxFilesToCompact,
       compactionRatio,
       offPeekCompactionRatio,
-      offPeakStartHour,
-      offPeakEndHour,
       throttlePoint,
       shouldDeleteExpired ? "" : " don't",
       majorCompactionPeriod,
@@ -170,20 +155,6 @@ public class CompactionConfiguration {
   }
 
   /**
-   * @return Hour at which off-peak compactions start
-   */
-  int getOffPeakStartHour() {
-    return offPeakStartHour;
-  }
-
-  /**
-   * @return Hour at which off-peak compactions end
-   */
-  int getOffPeakEndHour() {
-    return offPeakEndHour;
-  }
-
-  /**
    * @return ThrottlePoint used for classifying small and large compactions
    */
   long getThrottlePoint() {
@@ -212,8 +183,4 @@ public class CompactionConfiguration {
   boolean shouldDeleteExpired() {
     return shouldDeleteExpired;
   }
-
-  private static boolean isValidHour(int hour) {
-    return (hour >= 0 && hour <= 23);
-  }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java?rev=1445696&r1=1445695&r2=1445696&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java
Wed Feb 13 15:56:18 2013
@@ -70,7 +70,7 @@ public abstract class CompactionPolicy e
    */
   public abstract CompactSelection selectCompaction(
     final List<StoreFile> candidateFiles, final boolean isUserCompaction,
-    final boolean forceMajor) throws IOException;
+    final boolean mayUseOffPeak, final boolean forceMajor) throws IOException;
 
   /**
    * @param storeFiles Store files in the store.

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java?rev=1445696&r1=1445695&r2=1445696&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
Wed Feb 13 15:56:18 2013
@@ -50,30 +50,22 @@ import com.google.common.collect.Collect
 public class CompactionRequest implements Comparable<CompactionRequest>,
     Runnable {
     static final Log LOG = LogFactory.getLog(CompactionRequest.class);
-    private final HRegion r;
-    private final HStore s;
+    private final HRegion region;
+    private final HStore store;
     private final CompactSelection compactSelection;
     private final long totalSize;
     private final boolean isMajor;
-    private int p;
+    private int priority;
     private final Long timeInNanos;
     private HRegionServer server = null;
 
-    /**
-     * Map to track the number of compactions requested per region (id)
-     */
-    private static final ConcurrentHashMap<Long, AtomicInteger>
-      majorCompactions = new ConcurrentHashMap<Long, AtomicInteger>();
-    private static final ConcurrentHashMap<Long, AtomicInteger>
-      minorCompactions = new ConcurrentHashMap<Long, AtomicInteger>();
-
-    public CompactionRequest(HRegion r, HStore s,
-        CompactSelection files, boolean isMajor, int p) {
-      Preconditions.checkNotNull(r);
+    public CompactionRequest(HRegion region, HStore store,
+        CompactSelection files, boolean isMajor, int priority) {
+      Preconditions.checkNotNull(region);
       Preconditions.checkNotNull(files);
 
-      this.r = r;
-      this.s = s;
+      this.region = region;
+      this.store = store;
       this.compactSelection = files;
       long sz = 0;
       for (StoreFile sf : files.getFilesToCompact()) {
@@ -81,67 +73,11 @@ public class CompactionRequest implement
       }
       this.totalSize = sz;
       this.isMajor = isMajor;
-      this.p = p;
+      this.priority = priority;
       this.timeInNanos = System.nanoTime();
     }
 
     /**
-     * Find out if a given region is in compaction now.
-     *
-     * @param regionId
-     * @return a CompactionState
-     */
-    public static CompactionState getCompactionState(
-        final long regionId) {
-      Long key = Long.valueOf(regionId);
-      AtomicInteger major = majorCompactions.get(key);
-      AtomicInteger minor = minorCompactions.get(key);
-      int state = 0;
-      if (minor != null && minor.get() > 0) {
-        state += 1;  // use 1 to indicate minor here
-      }
-      if (major != null && major.get() > 0) {
-        state += 2;  // use 2 to indicate major here
-      }
-      switch (state) {
-      case 3:  // 3 = 2 + 1, so both major and minor
-        return CompactionState.MAJOR_AND_MINOR;
-      case 2:
-        return CompactionState.MAJOR;
-      case 1:
-        return CompactionState.MINOR;
-      default:
-        return CompactionState.NONE;
-      }
-    }
-
-    public static void preRequest(final CompactionRequest cr){
-      Long key = Long.valueOf(cr.getHRegion().getRegionId());
-      ConcurrentHashMap<Long, AtomicInteger> compactions =
-        cr.isMajor() ? majorCompactions : minorCompactions;
-      AtomicInteger count = compactions.get(key);
-      if (count == null) {
-        compactions.putIfAbsent(key, new AtomicInteger(0));
-        count = compactions.get(key);
-      }
-      count.incrementAndGet();
-    }
-
-    public static void postRequest(final CompactionRequest cr){
-      Long key = Long.valueOf(cr.getHRegion().getRegionId());
-      ConcurrentHashMap<Long, AtomicInteger> compactions =
-        cr.isMajor() ? majorCompactions : minorCompactions;
-      AtomicInteger count = compactions.get(key);
-      if (count != null) {
-        count.decrementAndGet();
-      }
-    }
-
-    public void finishRequest() {
-      this.compactSelection.finishRequest();
-    }
-
-    /**
      * This function will define where in the priority queue the request will
      * end up.  Those with the highest priorities will be first.  When the
      * priorities are the same it will first compare priority then date
@@ -160,7 +96,7 @@ public class CompactionRequest implement
       }
       int compareVal;
 
-      compareVal = p - request.p; //compare priority
+      compareVal = priority - request.priority; //compare priority
       if (compareVal != 0) {
         return compareVal;
       }
@@ -181,12 +117,12 @@ public class CompactionRequest implement
 
     /** Gets the HRegion for the request */
     public HRegion getHRegion() {
-      return r;
+      return region;
     }
 
     /** Gets the Store for the request */
     public HStore getStore() {
-      return s;
+      return store;
     }
 
     /** Gets the compact selection object for the request */
@@ -210,7 +146,7 @@ public class CompactionRequest implement
 
     /** Gets the priority for the request */
     public int getPriority() {
-      return p;
+      return priority;
     }
 
     public long getSelectionTime() {
@@ -219,7 +155,7 @@ public class CompactionRequest implement
 
     /** Gets the priority for the request */
     public void setPriority(int p) {
-      this.p = p;
+      this.priority = p;
     }
 
     public void setServer(HRegionServer hrs) {
@@ -241,12 +177,12 @@ public class CompactionRequest implement
               }
             }));
 
-      return "regionName=" + r.getRegionNameAsString() +
-        ", storeName=" + new String(s.getFamily().getName()) +
+      return "regionName=" + region.getRegionNameAsString() +
+        ", storeName=" + new String(store.getFamily().getName()) +
         ", fileCount=" + compactSelection.getFilesToCompact().size() +
         ", fileSize=" + StringUtils.humanReadableInt(totalSize) +
           ((fsList.isEmpty()) ? "" : " (" + fsList + ")") +
-        ", priority=" + p + ", time=" + timeInNanos;
+        ", priority=" + priority + ", time=" + timeInNanos;
     }
 
     @Override
@@ -257,18 +193,18 @@ public class CompactionRequest implement
       }
       try {
         long start = EnvironmentEdgeManager.currentTimeMillis();
-        boolean completed = r.compact(this);
+        boolean completed = region.compact(this);
         long now = EnvironmentEdgeManager.currentTimeMillis();
         LOG.info(((completed) ? "completed" : "aborted") + " compaction: " +
               this + "; duration=" + StringUtils.formatTimeDiff(now, start));
         if (completed) {
           // degenerate case: blocked regions require recursive enqueues
-          if (s.getCompactPriority() <= 0) {
+          if (store.getCompactPriority() <= 0) {
             server.compactSplitThread
-              .requestCompaction(r, s, "Recursive enqueue");
+              .requestCompaction(region, store, "Recursive enqueue");
           } else {
             // see if the compaction has caused us to exceed max region size
-            server.compactSplitThread.requestSplit(r);
+            server.compactSplitThread.requestSplit(region);
           }
         }
       } catch (IOException ex) {
@@ -279,7 +215,7 @@ public class CompactionRequest implement
         LOG.error("Compaction failed " + this, ex);
         server.checkFileSystem();
       } finally {
-        s.finishRequest(this);
+        store.finishRequest(this);
         LOG.debug("CompactSplitThread Status: " + server.compactSplitThread);
       }
     }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java?rev=1445696&r1=1445695&r2=1445696&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java
(original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java
Wed Feb 13 15:56:18 2013
@@ -48,7 +48,6 @@ import com.google.common.collect.Collect
 public class DefaultCompactionPolicy extends CompactionPolicy {
 
   private static final Log LOG = LogFactory.getLog(DefaultCompactionPolicy.class);
-  private final static Calendar calendar = new GregorianCalendar();
 
   public DefaultCompactionPolicy() {
     compactor = new DefaultCompactor(this);
@@ -80,7 +79,7 @@ public class DefaultCompactionPolicy ext
    * @throws java.io.IOException
    */
   public CompactSelection selectCompaction(List<StoreFile> candidateFiles,
-      boolean isUserCompaction, boolean forceMajor)
+      final boolean isUserCompaction, final boolean mayUseOffPeak, final boolean forceMajor)
     throws IOException {
     // Preliminary compaction subject to filters
     CompactSelection candidateSelection = new CompactSelection(candidateFiles);
@@ -110,6 +109,7 @@ public class DefaultCompactionPolicy ext
 
     if (!majorCompaction) {
       // we're doing a minor compaction, let's see what files are applicable
+      candidateSelection.setOffPeak(mayUseOffPeak);
       candidateSelection = filterBulk(candidateSelection);
       candidateSelection = applyCompactionPolicy(candidateSelection);
       candidateSelection = checkMinFilesCriteria(candidateSelection);
@@ -232,6 +232,7 @@ public class DefaultCompactionPolicy ext
           " files ready for compaction.  Need " + minFiles + " to initiate.");
       }
       candidates.emptyFileList();
+      candidates.setOffPeak(false);
     }
     return candidates;
   }
@@ -274,11 +275,9 @@ public class DefaultCompactionPolicy ext
     // we're doing a minor compaction, let's see what files are applicable
     int start = 0;
     double ratio = comConf.getCompactionRatio();
-    if (isOffPeakHour() && candidates.trySetOffpeak()) {
+    if (candidates.isOffPeakCompaction()) {
       ratio = comConf.getCompactionRatioOffPeak();
-      LOG.info("Running an off-peak compaction, selection ratio = " + ratio
-          + ", numOutstandingOffPeakCompactions is now "
-          + CompactSelection.getNumOutStandingOffPeakCompactions());
+      LOG.info("Running an off-peak compaction, selection ratio = " + ratio);
     }
 
     // get store file sizes for incremental compacting selection.
@@ -394,21 +393,4 @@ public class DefaultCompactionPolicy ext
     int numCandidates = storeFiles.size() - filesCompacting.size();
     return numCandidates > comConf.getMinFilesToCompact();
   }
-
-  /**
-   * @return whether this is off-peak hour
-   */
-  private boolean isOffPeakHour() {
-    int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
-    int startHour = comConf.getOffPeakStartHour();
-    int endHour = comConf.getOffPeakEndHour();
-    // If offpeak time checking is disabled just return false.
-    if (startHour == endHour) {
-      return false;
-    }
-    if (startHour < endHour) {
-      return (currentHour >= startHour && currentHour < endHour);
-    }
-    return (currentHour >= startHour || currentHour < endHour);
-  }
 }

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakCompactions.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakCompactions.java?rev=1445696&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakCompactions.java
(added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakCompactions.java
Wed Feb 13 15:56:18 2013
@@ -0,0 +1,111 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * The class used to track off-peak hours and compactions. Off-peak compaction counter
+ * is global for the entire server, hours can be different per instance of this class,
+ * based on the configuration of the corresponding store.
+ */
+@InterfaceAudience.Private
+public class OffPeakCompactions {
+  private static final Log LOG = LogFactory.getLog(OffPeakCompactions.class);
+  private final static Calendar calendar = new GregorianCalendar();
+  private int offPeakStartHour;
+  private int offPeakEndHour;
+
+  // TODO: replace with AtomicLong, see HBASE-7437.
+  /**
+   * Number of off peak compactions either in the compaction queue or
+   * happening now. Please lock compactionCountLock before modifying.
+   */
+  private static long numOutstanding = 0;
+
+  /**
+   * Lock object for numOutstandingOffPeakCompactions
+   */
+  private static final Object compactionCountLock = new Object();
+
+  public OffPeakCompactions(Configuration conf) {
+    offPeakStartHour = conf.getInt("hbase.offpeak.start.hour", -1);
+    offPeakEndHour = conf.getInt("hbase.offpeak.end.hour", -1);
+    if (!isValidHour(offPeakStartHour) || !isValidHour(offPeakEndHour)) {
+      if (!(offPeakStartHour == -1 && offPeakEndHour == -1)) {
+        LOG.warn("Ignoring invalid start/end hour for peak hour : start = " +
+            this.offPeakStartHour + " end = " + this.offPeakEndHour +
+            ". Valid numbers are [0-23]");
+      }
+      this.offPeakStartHour = this.offPeakEndHour = -1;
+    }
+  }
+
+  /**
+   * Tries making the compaction off-peak.
+   * @return Whether the compaction can be made off-peak.
+   */
+  public boolean tryStartOffPeakRequest() {
+    if (!isOffPeakHour()) return false;
+    synchronized(compactionCountLock) {
+      if (numOutstanding == 0) {
+         numOutstanding++;
+         return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * The current compaction finished, so reset the off peak compactions count
+   * if this was an off peak compaction.
+   */
+  public void endOffPeakRequest() {
+    long newValueToLog = -1;
+    synchronized(compactionCountLock) {
+      newValueToLog = --numOutstanding;
+    }
+    LOG.info("Compaction done, numOutstandingOffPeakCompactions is now " +  newValueToLog);
+  }
+
+  /**
+   * @return whether this is off-peak hour
+   */
+  private boolean isOffPeakHour() {
+    int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
+    // If offpeak time checking is disabled just return false.
+    if (this.offPeakStartHour == this.offPeakEndHour) {
+      return false;
+    }
+    if (this.offPeakStartHour < this.offPeakEndHour) {
+      return (currentHour >= this.offPeakStartHour && currentHour < this.offPeakEndHour);
+    }
+    return (currentHour >= this.offPeakStartHour || currentHour < this.offPeakEndHour);
+  }
+
+  private static boolean isValidHour(int hour) {
+    return (hour >= 0 && hour <= 23);
+  }
+}

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionState.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionState.java?rev=1445696&r1=1445695&r2=1445696&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionState.java
(original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionState.java
Wed Feb 13 15:56:18 2013
@@ -163,7 +163,7 @@ public class TestCompactionState {
       // otherwise, the compaction should have already been done
       if (expectedState != state) {
         for (HRegion region: regions) {
-          state = CompactionRequest.getCompactionState(region.getRegionId());
+          state = region.getCompactionState();
           assertEquals(CompactionState.NONE, state);
         }
       } else {

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java?rev=1445696&r1=1445695&r2=1445696&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
(original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
Wed Feb 13 15:56:18 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.HTableDes
 import org.apache.hadoop.hbase.SmallTests;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -220,16 +221,25 @@ public class TestDefaultCompactSelection
 
   void compactEquals(List<StoreFile> candidates, long... expected)
     throws IOException {
-    compactEquals(candidates, false, expected);
+    compactEquals(candidates, false, false, expected);
   }
 
-  void compactEquals(List<StoreFile> candidates, boolean forcemajor,
+  void compactEquals(List<StoreFile> candidates, boolean forcemajor, long... expected)
+    throws IOException {
+    compactEquals(candidates, forcemajor, false, expected);
+  }
+
+  void compactEquals(List<StoreFile> candidates, boolean forcemajor, boolean isOffPeak,
       long ... expected)
   throws IOException {
     store.forceMajor = forcemajor;
     //Test Default compactions
-    List<StoreFile> actual = store.compactionPolicy
-      .selectCompaction(candidates, false, forcemajor).getFilesToCompact();
+    CompactSelection result = store.compactionPolicy
+        .selectCompaction(candidates, false, isOffPeak, forcemajor);
+    List<StoreFile> actual = result.getFilesToCompact();
+    if (isOffPeak && !forcemajor) {
+      assertTrue(result.isOffPeakCompaction());
+    }
     assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual)));
     store.forceMajor = false;
   }
@@ -309,36 +319,11 @@ public class TestDefaultCompactSelection
      * current compaction algorithm.  Developed to ensure that refactoring
      * doesn't implicitly alter this.
      */
-    //long tooBig = maxSize + 1;
-
-    Calendar calendar = new GregorianCalendar();
-    int hourOfDay = calendar.get(Calendar.HOUR_OF_DAY);
-    LOG.debug("Hour of day = " + hourOfDay);
-    int hourPlusOne = ((hourOfDay+1)%24);
-    int hourMinusOne = ((hourOfDay-1+24)%24);
-    int hourMinusTwo = ((hourOfDay-2+24)%24);
-
-    // check compact selection without peak hour setting
-    LOG.debug("Testing compact selection without off-peak settings...");
-    compactEquals(sfCreate(999,50,12,12,1), 12, 12, 1);
-
     // set an off-peak compaction threshold
     this.conf.setFloat("hbase.hstore.compaction.ratio.offpeak", 5.0F);
-
-    // set peak hour to current time and check compact selection
-    this.conf.setLong("hbase.offpeak.start.hour", hourMinusOne);
-    this.conf.setLong("hbase.offpeak.end.hour", hourPlusOne);
-    LOG.debug("Testing compact selection with off-peak settings (" +
-        hourMinusOne + ", " + hourPlusOne + ")");
-    store.compactionPolicy.updateConfiguration();
-    compactEquals(sfCreate(999, 50, 12, 12, 1), 50, 12, 12, 1);
-
-    // set peak hour outside current selection and check compact selection
-    this.conf.setLong("hbase.offpeak.start.hour", hourMinusTwo);
-    this.conf.setLong("hbase.offpeak.end.hour", hourMinusOne);
     store.compactionPolicy.updateConfiguration();
-    LOG.debug("Testing compact selection with off-peak settings (" +
-        hourMinusTwo + ", " + hourMinusOne + ")");
-    compactEquals(sfCreate(999,50,12,12, 1), 12, 12, 1);
+    // Test with and without the flag.
+    compactEquals(sfCreate(999, 50, 12, 12, 1), false, true, 50, 12, 12, 1);
+    compactEquals(sfCreate(999, 50, 12, 12, 1), 12, 12, 1);
   }
 }

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestOffPeakCompactions.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestOffPeakCompactions.java?rev=1445696&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestOffPeakCompactions.java
(added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestOffPeakCompactions.java
Wed Feb 13 15:56:18 2013
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.regionserver.compactions.OffPeakCompactions;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestOffPeakCompactions {
+  private final static Log LOG = LogFactory.getLog(TestDefaultCompactSelection.class);
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  @Test
+  public void testOffPeakHours() throws IOException {
+    Calendar calendar = new GregorianCalendar();
+    int hourOfDay = calendar.get(Calendar.HOUR_OF_DAY);
+    LOG.debug("Hour of day = " + hourOfDay);
+    int hourPlusOne = ((hourOfDay+1)%24);
+    int hourMinusOne = ((hourOfDay-1+24)%24);
+    int hourMinusTwo = ((hourOfDay-2+24)%24);
+
+    Configuration conf = TEST_UTIL.getConfiguration();
+    OffPeakCompactions opc = new OffPeakCompactions(conf);
+    LOG.debug("Testing without off-peak settings...");
+    assertFalse(opc.tryStartOffPeakRequest());
+
+    // set peak hour to current time and check compact selection
+    conf.setLong("hbase.offpeak.start.hour", hourMinusOne);
+    conf.setLong("hbase.offpeak.end.hour", hourPlusOne);
+    opc = new OffPeakCompactions(conf);
+    LOG.debug("Testing compact selection with off-peak settings (" +
+        hourMinusOne + ", " + hourPlusOne + ")");
+    assertTrue(opc.tryStartOffPeakRequest());
+    opc.endOffPeakRequest();
+
+    // set peak hour outside current selection and check compact selection
+    conf.setLong("hbase.offpeak.start.hour", hourMinusTwo);
+    conf.setLong("hbase.offpeak.end.hour", hourMinusOne);
+    opc = new OffPeakCompactions(conf);
+    assertFalse(opc.tryStartOffPeakRequest());
+  }
+}

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/PerfTestCompactionPolicies.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/PerfTestCompactionPolicies.java?rev=1445696&r1=1445695&r2=1445696&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/PerfTestCompactionPolicies.java
(original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/PerfTestCompactionPolicies.java
Wed Feb 13 15:56:18 2013
@@ -170,7 +170,7 @@ public class PerfTestCompactionPolicies 
   private List<StoreFile> runIteration(List<StoreFile> startingStoreFiles) throws
IOException {
 
     List<StoreFile> storeFiles = new ArrayList<StoreFile>(startingStoreFiles);
-    CompactSelection sel = cp.selectCompaction(storeFiles, false, false);
+    CompactSelection sel = cp.selectCompaction(storeFiles, false, false, false);
     int newFileSize = 0;
 
     List<StoreFile> filesToCompact = sel.getFilesToCompact();



Mime
View raw message