hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1340280 - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regionserver/
Date Fri, 18 May 2012 22:08:44 GMT
Author: stack
Date: Fri May 18 22:08:44 2012
New Revision: 1340280

URL: http://svn.apache.org/viewvc?rev=1340280&view=rev
Log:
HBASE-5920 New Compactions Logic can silently prevent user-initiated compactions from occurring

Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=1340280&r1=1340279&r2=1340280&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
Fri May 18 22:08:44 2012
@@ -50,12 +50,6 @@ public class CompactSplitThread implemen
   private final ThreadPoolExecutor smallCompactions;
   private final ThreadPoolExecutor splits;
 
-  /* The default priority for user-specified compaction requests.
-   * The user gets top priority unless we have blocking compactions. (Pri <= 0)
-   */
-  public static final int PRIORITY_USER = 1;
-  public static final int NO_PRIORITY = Integer.MIN_VALUE;
-
   /**
    * Splitting should not take place if the total number of regions exceed this.
    * This is not a hard limit to the number of regions but it is a guideline to
@@ -129,7 +123,7 @@ public class CompactSplitThread implemen
 
   public synchronized boolean requestSplit(final HRegion r) {
     // don't split regions that are blocking
-    if (shouldSplitRegion() && r.getCompactPriority() >= PRIORITY_USER) {
+    if (shouldSplitRegion() && r.getCompactPriority() >= Store.PRIORITY_USER)
{
       byte[] midKey = r.checkSplit();
       if (midKey != null) {
         requestSplit(r, midKey);
@@ -158,13 +152,13 @@ public class CompactSplitThread implemen
   public synchronized void requestCompaction(final HRegion r,
       final String why) {
     for(Store s : r.getStores().values()) {
-      requestCompaction(r, s, why, NO_PRIORITY);
+      requestCompaction(r, s, why, Store.NO_PRIORITY);
     }
   }
 
   public synchronized void requestCompaction(final HRegion r, final Store s,
       final String why) {
-    requestCompaction(r, s, why, NO_PRIORITY);
+    requestCompaction(r, s, why, Store.NO_PRIORITY);
   }
 
   public synchronized void requestCompaction(final HRegion r, final String why,
@@ -185,10 +179,10 @@ public class CompactSplitThread implemen
     if (this.server.isStopped()) {
       return;
     }
-    CompactionRequest cr = s.requestCompaction();
+    CompactionRequest cr = s.requestCompaction(priority);
     if (cr != null) {
       cr.setServer(server);
-      if (priority != NO_PRIORITY) {
+      if (priority != Store.NO_PRIORITY) {
         cr.setPriority(priority);
       }
       ThreadPoolExecutor pool = s.throttleCompaction(cr.getSize())
@@ -200,6 +194,11 @@ public class CompactSplitThread implemen
             + (why != null && !why.isEmpty() ? "; Because: " + why : "")
             + "; " + this);
       }
+    } else {
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Not compacting " + r.getRegionNameAsString() + 
+            " because compaction request was cancelled");
+      }
     }
   }
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1340280&r1=1340279&r2=1340280&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri
May 18 22:08:44 2012
@@ -3532,9 +3532,11 @@ public class  HRegionServer implements C
       if (major) {
         region.triggerMajorCompaction();
       }
+      LOG.trace("User-triggered compaction requested for region " +
+        region.getRegionNameAsString());
       compactSplitThread.requestCompaction(region,
         "User-triggered " + (major ? "major " : "") + "compaction",
-          CompactSplitThread.PRIORITY_USER);
+          Store.PRIORITY_USER);
       return CompactRegionResponse.newBuilder().build();
     } catch (IOException ie) {
       throw new ServiceException(ie);

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1340280&r1=1340279&r2=1340280&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Fri May 18 22:08:44
2012
@@ -108,6 +108,7 @@ import com.google.common.collect.Lists;
 @InterfaceAudience.Private
 public class Store extends SchemaConfigured implements HeapSize {
   static final Log LOG = LogFactory.getLog(Store.class);
+
   protected final MemStore memstore;
   // This stores directory in the filesystem.
   private final Path homedir;
@@ -133,6 +134,12 @@ public class Store extends SchemaConfigu
   final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
   private final boolean verifyBulkLoads;
 
+  /* The default priority for user-specified compaction requests.
+   * The user gets top priority unless we have blocking compactions. (Pri <= 0)
+   */
+  public static final int PRIORITY_USER = 1;
+  public static final int NO_PRIORITY = Integer.MIN_VALUE;
+
   // not private for testing
   /* package */ScanInfo scanInfo;
   /*
@@ -166,7 +173,7 @@ public class Store extends SchemaConfigu
    * @param region
    * @param family HColumnDescriptor for this column
    * @param fs file system object
-   * @param conf configuration object
+   * @param confParam configuration object
    * failed.  Can be null.
    * @throws IOException
    */
@@ -342,7 +349,7 @@ public class Store extends SchemaConfigu
   Path getHomedir() {
     return homedir;
   }
-  
+
   /**
    * @return the data block encoder
    */
@@ -464,7 +471,7 @@ public class Store extends SchemaConfigu
 
   /**
    * Removes a kv from the memstore. The KeyValue is removed only
-   * if its key & memstoreTS matches the key & memstoreTS value of the 
+   * if its key & memstoreTS matches the key & memstoreTS value of the
    * kv parameter.
    *
    * @param kv
@@ -550,8 +557,8 @@ public class Store extends SchemaConfigu
   }
 
   /**
-   * This method should only be called from HRegion.  It is assumed that the 
-   * ranges of values in the HFile fit within the stores assigned region. 
+   * This method should only be called from HRegion.  It is assumed that the
+   * ranges of values in the HFile fit within the stores assigned region.
    * (assertBulkLoadHFileOk checks this)
    */
   void bulkLoadHFile(String srcPathStr) throws IOException {
@@ -631,7 +638,7 @@ public class Store extends SchemaConfigu
         ThreadPoolExecutor storeFileCloserThreadPool = this.region
             .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-"
                 + this.family.getNameAsString());
-  
+
         // close each store file in parallel
         CompletionService<Void> completionService =
           new ExecutorCompletionService<Void>(storeFileCloserThreadPool);
@@ -643,7 +650,7 @@ public class Store extends SchemaConfigu
             }
           });
         }
-  
+
         try {
           for (int i = 0; i < result.size(); i++) {
             Future<Void> future = completionService.take();
@@ -772,7 +779,7 @@ public class Store extends SchemaConfigu
       scanner.close();
     }
     if (LOG.isInfoEnabled()) {
-      LOG.info("Flushed " + 
+      LOG.info("Flushed " +
                ", sequenceid=" + logCacheFlushId +
                ", memsize=" + StringUtils.humanReadableInt(flushed) +
                ", into tmp file " + pathName);
@@ -983,7 +990,7 @@ public class Store extends SchemaConfigu
    * <p>We don't want to hold the structureLock for the whole time, as a compact()
    * can be lengthy and we want to allow cache-flushes during this period.
    *
-   * @param CompactionRequest
+   * @param cr
    *          compaction details obtained from requestCompaction()
    * @throws IOException
    * @return Storefile we compacted into or null if we failed or opted out early.
@@ -1221,7 +1228,7 @@ public class Store extends SchemaConfigu
       if (jitterPct > 0) {
         long jitter = Math.round(ret * jitterPct);
         // deterministic jitter avoids a major compaction storm on restart
-        ImmutableList<StoreFile> snapshot = storefiles; 
+        ImmutableList<StoreFile> snapshot = storefiles;
         if (snapshot != null && !snapshot.isEmpty()) {
           String seed = snapshot.get(0).getPath().getName();
           double curRand = new Random(seed.hashCode()).nextDouble();
@@ -1235,6 +1242,10 @@ public class Store extends SchemaConfigu
   }
 
   public CompactionRequest requestCompaction() {
+    return requestCompaction(NO_PRIORITY);
+  }
+
+  public CompactionRequest requestCompaction(int priority) {
     // don't even select for compaction if writes are disabled
     if (!this.region.areWritesEnabled()) {
       return null;
@@ -1265,7 +1276,7 @@ public class Store extends SchemaConfigu
           // coprocessor is overriding normal file selection
           filesToCompact = new CompactSelection(conf, candidates);
         } else {
-          filesToCompact = compactSelection(candidates);
+          filesToCompact = compactSelection(candidates, priority);
         }
 
         if (region.getCoprocessorHost() != null) {
@@ -1295,7 +1306,7 @@ public class Store extends SchemaConfigu
         }
 
         // everything went better than expected. create a compaction request
-        int pri = getCompactPriority();
+        int pri = getCompactPriority(priority);
         ret = new CompactionRequest(region, this, filesToCompact, isMajor, pri);
       }
     } catch (IOException ex) {
@@ -1315,6 +1326,16 @@ public class Store extends SchemaConfigu
   }
 
   /**
+   * Algorithm to choose which files to compact, see {@link #compactSelection(java.util.List,
int)}
+   * @param candidates
+   * @return
+   * @throws IOException
+   */
+  CompactSelection compactSelection(List<StoreFile> candidates) throws IOException
{
+    return compactSelection(candidates,NO_PRIORITY);
+  }
+
+  /**
    * Algorithm to choose which files to compact
    *
    * Configuration knobs:
@@ -1333,7 +1354,7 @@ public class Store extends SchemaConfigu
    * @return subset copy of candidate list that meets compaction criteria
    * @throws IOException
    */
-  CompactSelection compactSelection(List<StoreFile> candidates)
+  CompactSelection compactSelection(List<StoreFile> candidates, int priority)
       throws IOException {
     // ASSUMPTION!!! filesCompacting is locked when calling this function
 
@@ -1381,10 +1402,16 @@ public class Store extends SchemaConfigu
       return compactSelection;
     }
 
-    // major compact on user action or age (caveat: we have too many files)
-    boolean majorcompaction =
-      (forcemajor || isMajorCompaction(compactSelection.getFilesToCompact()))
-      && compactSelection.getFilesToCompact().size() < this.maxFilesToCompact;
+    // Force a major compaction if this is a user-requested major compaction,
+    // or if we do not have too many files to compact and this was requested
+    // as a major compaction
+    boolean majorcompaction = (forcemajor && priority == PRIORITY_USER) ||
+      (forcemajor || isMajorCompaction(compactSelection.getFilesToCompact())) &&
+      (compactSelection.getFilesToCompact().size() < this.maxFilesToCompact
+    );
+    LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
+      this.getColumnFamilyName() + ": Initiating " +
+      (majorcompaction ? "major" : "minor") + "compaction");
 
     if (!majorcompaction &&
         !hasReferences(compactSelection.getFilesToCompact())) {
@@ -1394,6 +1421,11 @@ public class Store extends SchemaConfigu
 
       // skip selection algorithm if we don't have enough files
       if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("Not compacting files because we only have " +
+            compactSelection.getFilesToCompact().size() +
+            " files ready for compaction.  Need " + this.minFilesToCompact + " to initiate.");
+        }
         compactSelection.emptyFileList();
         return compactSelection;
       }
@@ -1461,11 +1493,18 @@ public class Store extends SchemaConfigu
         return compactSelection;
       }
     } else {
-      // all files included in this compaction, up to max
-      if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
-        int pastMax =
-          compactSelection.getFilesToCompact().size() - this.maxFilesToCompact;
-        compactSelection.clearSubList(0, pastMax);
+      if(majorcompaction) {
+        if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
+          LOG.debug("Warning, compacting more than " + this.maxFilesToCompact +
+            " files, probably because of a user-requested major compaction");
+          if(priority != PRIORITY_USER) {
+            LOG.error("Compacting more than max files on a non user-requested compaction");
+          }
+        }
+      } else if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact)
{
+        // all files included in this compaction, up to max
+        int pastMax = compactSelection.getFilesToCompact().size() - this.maxFilesToCompact;
+        compactSelection.getFilesToCompact().subList(0, pastMax).clear();
       }
     }
     return compactSelection;
@@ -1991,11 +2030,21 @@ public class Store extends SchemaConfigu
     return this.memstore.heapSize();
   }
 
+  public int getCompactPriority() {
+    return getCompactPriority(NO_PRIORITY);
+  }
+
   /**
    * @return The priority that this store should have in the compaction queue
+   * @param priority
    */
-  public int getCompactPriority() {
-    return this.blockingStoreFileCount - this.storefiles.size();
+  public int getCompactPriority(int priority) {
+    // If this is a user-requested compaction, leave this at the highest priority
+    if(priority == PRIORITY_USER) {
+      return PRIORITY_USER;
+    } else {
+      return this.blockingStoreFileCount - this.storefiles.size();
+    }
   }
 
   boolean throttleCompaction(long compactionSize) {
@@ -2131,7 +2180,7 @@ public class Store extends SchemaConfigu
     return this.cacheConf;
   }
 
-  public static final long FIXED_OVERHEAD = 
+  public static final long FIXED_OVERHEAD =
       ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE +
           + (17 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
           + (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
@@ -2209,7 +2258,7 @@ public class Store extends SchemaConfigu
     public boolean getKeepDeletedCells() {
       return keepDeletedCells;
     }
-    
+
     public long getTimeToPurgeDeletes() {
       return timeToPurgeDeletes;
     }

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1340280&r1=1340279&r2=1340280&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java Fri
May 18 22:08:44 2012
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.experimental.categories.Category;
@@ -76,6 +77,7 @@ public class TestCompaction extends HBas
   private int compactionThreshold;
   private byte[] firstRowBytes, secondRowBytes, thirdRowBytes;
   final private byte[] col1, col2;
+  private static final long MAX_FILES_TO_COMPACT = 10;
 
   /** constructor */
   public TestCompaction() throws Exception {
@@ -614,6 +616,43 @@ public class TestCompaction extends HBas
     fail("testCompactionWithCorruptResult failed since no exception was" +
         "thrown while completing a corrupt file");
   }
+  
+  /**
+   * Test for HBASE-5920 - Test user requested major compactions always occurring
+   */
+  public void testNonUserMajorCompactionRequest() throws Exception {
+    Store store = r.getStore(COLUMN_FAMILY);
+    createStoreFile(r);
+    for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
+      createStoreFile(r);
+    }
+    store.triggerMajorCompaction();
+
+    CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY);
+    assertNotNull("Expected to receive a compaction request", request);
+    assertEquals(
+      "System-requested major compaction should not occur if there are too many store files",
+      false,
+      request.isMajor());
+  }
+
+  /**
+   * Test for HBASE-5920
+   */
+  public void testUserMajorCompactionRequest() throws IOException{
+    Store store = r.getStore(COLUMN_FAMILY);
+    createStoreFile(r);
+    for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
+      createStoreFile(r);
+    }
+    store.triggerMajorCompaction();
+    CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER);
+    assertNotNull("Expected to receive a compaction request", request);
+    assertEquals(
+      "User-requested major compaction should always occur, even if there are too many store
files",
+      true, 
+      request.isMajor());
+  }
 
   @org.junit.Rule
   public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =



Mime
View raw message