hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nspiegelb...@apache.org
Subject svn commit: r1101677 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/java/org/apache/hadoop/hbase/regionserver/compactions/
Date Tue, 10 May 2011 23:21:04 GMT
Author: nspiegelberg
Date: Tue May 10 23:21:04 2011
New Revision: 1101677

URL: http://svn.apache.org/viewvc?rev=1101677&view=rev
Log:
HBASE-1476 Multithreaded Compactions

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1101677&r1=1101676&r2=1101677&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Tue May 10 23:21:04 2011
@@ -212,6 +212,8 @@ Release 0.91.0 - Unreleased
    HBASE-3721  Speedup LoadIncrementalHFiles (Ted Yu)
    HBASE-3855  Performance degradation of memstore because reseek is linear
                (dhruba borthakur)
+   HBASE-3797  StoreFile Level Compaction Locking
+   HBASE-1476  Multithreaded Compactions
 
   TASKS
    HBASE-3559  Move report of split to master OFF the heartbeat channel

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=1101677&r1=1101676&r2=1101677&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
Tue May 10 23:21:04 2011
@@ -19,34 +19,32 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
 import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.base.Preconditions;
 
 /**
  * Compact region on request and then run split if appropriate
  */
-public class CompactSplitThread extends Thread implements CompactionRequestor {
+public class CompactSplitThread implements CompactionRequestor {
   static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
-  private final long frequency;
-  private final ReentrantLock lock = new ReentrantLock();
 
   private final HRegionServer server;
   private final Configuration conf;
 
-  protected final BlockingQueue<CompactionRequest> compactionQueue =
-    new PriorityBlockingQueue<CompactionRequest>();
+  private final ThreadPoolExecutor largeCompactions;
+  private final ThreadPoolExecutor smallCompactions;
+  private final ThreadPoolExecutor splits;
+  private final long throttleSize;
 
   /* The default priority for user-specified compaction requests.
    * The user gets top priority unless we have blocking compactions. (Pri <= 0)
@@ -62,85 +60,71 @@ public class CompactSplitThread extends 
   private int regionSplitLimit;
 
   /** @param server */
-  public CompactSplitThread(HRegionServer server) {
+  CompactSplitThread(HRegionServer server) {
     super();
     this.server = server;
     this.conf = server.getConfiguration();
     this.regionSplitLimit = conf.getInt("hbase.regionserver.regionSplitLimit",
         Integer.MAX_VALUE);
-    this.frequency =
-      conf.getLong("hbase.regionserver.thread.splitcompactcheckfrequency",
-      20 * 1000);
+
+    int largeThreads = Math.max(1, conf.getInt(
+        "hbase.regionserver.thread.compaction.large", 1));
+    int smallThreads = conf.getInt(
+        "hbase.regionserver.thread.compaction.small", 0);
+    throttleSize = conf.getLong(
+        "hbase.regionserver.thread.compaction.throttle", 0);
+    int splitThreads = conf.getInt("hbase.regionserver.thread.split", 1);
+
+    // if we have throttle threads, make sure the user also specified size
+    Preconditions.checkArgument(smallThreads == 0 || throttleSize > 0);
+
+    this.largeCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
+        60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>());
+    this.largeCompactions
+        .setRejectedExecutionHandler(new CompactionRequest.Rejection());
+    if (smallThreads <= 0) {
+      this.smallCompactions = null;
+    } else {
+      this.smallCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
+          60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>());
+      this.smallCompactions
+          .setRejectedExecutionHandler(new CompactionRequest.Rejection());
+    }
+    this.splits = (ThreadPoolExecutor) Executors
+        .newFixedThreadPool(splitThreads);
   }
 
   @Override
-  public void run() {
-    while (!this.server.isStopped()) {
-      CompactionRequest compactionRequest = null;
-      HRegion r = null;
-      boolean completed = false;
-      try {
-        compactionRequest = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
-        if (compactionRequest != null) {
-          r = compactionRequest.getHRegion();
-          lock.lock();
-          try {
-            // look for a split first
-            if(!this.server.isStopped()) {
-              // don't split regions that are blocking
-              if (shouldSplitRegion() && r.getCompactPriority() >= PRIORITY_USER)
{
-                byte[] midkey = compactionRequest.getStore().checkSplit();
-                if (midkey != null) {
-                  split(r, midkey);
-                  continue;
-                }
-              }
-            }
-
-            // now test for compaction
-            if(!this.server.isStopped()) {
-              long startTime = EnvironmentEdgeManager.currentTimeMillis();
-              completed = r.compact(compactionRequest);
-              long now = EnvironmentEdgeManager.currentTimeMillis();
-              LOG.info(((completed) ? "completed" : "aborted")
-                  + " compaction: " + compactionRequest + ", duration="
-                  + StringUtils.formatTimeDiff(now, startTime));
-              if (completed) { // compaction aborted?
-                this.server.getMetrics().
-                  addCompaction(now - startTime, compactionRequest.getSize());
-              }
-            }
-          } finally {
-            lock.unlock();
-          }
-        }
-      } catch (InterruptedException ex) {
-        continue;
-      } catch (IOException ex) {
-        LOG.error("Compaction/Split failed " + compactionRequest,
-          RemoteExceptionHandler.checkIOException(ex));
-        if (!server.checkFileSystem()) {
-          break;
-        }
-      } catch (Exception ex) {
-        LOG.error("Compaction failed " + compactionRequest, ex);
-        if (!server.checkFileSystem()) {
-          break;
-        }
-      } finally {
-        if (compactionRequest != null) {
-          Store s = compactionRequest.getStore();
-          s.finishRequest(compactionRequest);
-          // degenerate case: blocked regions require recursive enqueues
-          if (s.getCompactPriority() < PRIORITY_USER && completed) {
-            requestCompaction(r, s, "Recursive enqueue");
-          }
-        }
-        compactionRequest = null;
+  public String toString() {
+    return "compaction_queue="
+        + (smallCompactions != null ? "("
+            + largeCompactions.getQueue().size() + ":"
+            + smallCompactions.getQueue().size() + ")"
+            : largeCompactions.getQueue().size())
+        + ", split_queue=" + splits.getQueue().size();
+  }
+
+  public synchronized boolean requestSplit(final HRegion r) {
+    // don't split regions that are blocking
+    if (shouldSplitRegion() && r.getCompactPriority() >= PRIORITY_USER) {
+      byte[] midKey = r.checkSplit();
+      if (midKey != null) {
+        requestSplit(r, midKey);
+        return true;
       }
     }
-    compactionQueue.clear();
-    LOG.info(getName() + " exiting");
+    return false;
+  }
+
+  public synchronized void requestSplit(final HRegion r, byte[] midKey) {
+    try {
+      this.splits.execute(new SplitRequest(r, midKey, this.server));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Split requested for " + r + ".  " + this);
+      }
+    } catch (RejectedExecutionException ree) {
+      LOG.info("Could not execute split for " + r, ree);
+    }
   }
 
   public synchronized void requestCompaction(final HRegion r,
@@ -164,7 +148,7 @@ public class CompactSplitThread extends 
 
   /**
    * @param r HRegion store belongs to
-   * @param force Whether next compaction should be major
+   * @param s Store to request compaction on
    * @param why Why compaction requested -- used in debug messages
    * @param priority override the default priority (NO_PRIORITY == decide)
    */
@@ -175,67 +159,58 @@ public class CompactSplitThread extends 
     }
     CompactionRequest cr = s.requestCompaction();
     if (cr != null) {
+      cr.setServer(server);
       if (priority != NO_PRIORITY) {
         cr.setPriority(priority);
       }
-      boolean addedToQueue = compactionQueue.add(cr);
-      if (!addedToQueue) {
-        LOG.error("Could not add request to compaction queue: " + cr);
-        s.finishRequest(cr);
-      } else if (LOG.isDebugEnabled()) {
-        LOG.debug("Compaction requested: " + cr
-            + (why != null && !why.isEmpty() ? "; Because: " + why : "")
-            + "; Priority: " + priority + "; Compaction queue size: "
-            + compactionQueue.size());
+      ThreadPoolExecutor pool = largeCompactions;
+      if (smallCompactions != null && throttleSize > cr.getSize()) {
+        // smallCompactions is like the 10 items or less line at Walmart
+        pool = smallCompactions;
       }
-    }
-  }
-
-  private void split(final HRegion parent, final byte [] midKey)
-  throws IOException {
-    final long startTime = System.currentTimeMillis();
-    SplitTransaction st = new SplitTransaction(parent, midKey);
-    // If prepare does not return true, for some reason -- logged inside in
-    // the prepare call -- we are not ready to split just now.  Just return.
-    if (!st.prepare()) return;
-    try {
-      st.execute(this.server, this.server);
-    } catch (Exception e) {
-      try {
-        LOG.info("Running rollback of failed split of " +
-          parent.getRegionNameAsString() + "; " + e.getMessage());
-        st.rollback(this.server, this.server);
-        LOG.info("Successful rollback of failed split of " +
-          parent.getRegionNameAsString());
-      } catch (Exception ee) {
-        // If failed rollback, kill this server to avoid having a hole in table.
-        LOG.info("Failed rollback of failed split of " +
-          parent.getRegionNameAsString() + " -- aborting server", ee);
-        this.server.abort("Failed split");
+      pool.execute(cr);
+      if (LOG.isDebugEnabled()) {
+        String type = "";
+        if (smallCompactions != null) {
+          type = (pool == smallCompactions) ? "Small " : "Large ";
+        }
+        LOG.debug(type + "Compaction requested: " + cr
+            + (why != null && !why.isEmpty() ? "; Because: " + why : "")
+            + "; " + this);
       }
-      return;
     }
-
-    LOG.info("Region split, META updated, and report to master. Parent=" +
-      parent.getRegionInfo().getRegionNameAsString() + ", new regions: " +
-      st.getFirstDaughter().getRegionNameAsString() + ", " +
-      st.getSecondDaughter().getRegionNameAsString() + ". Split took " +
-      StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
   }
 
   /**
    * Only interrupt once it's done with a run through the work loop.
    */
   void interruptIfNecessary() {
-    if (lock.tryLock()) {
+    splits.shutdown();
+    largeCompactions.shutdown();
+    if (smallCompactions != null)
+      smallCompactions.shutdown();
+  }
+
+  private void waitFor(ThreadPoolExecutor t, String name) {
+    boolean done = false;
+    while (!done) {
       try {
-        this.interrupt();
-      } finally {
-        lock.unlock();
+        done = t.awaitTermination(60, TimeUnit.SECONDS);
+        LOG.debug("Waiting for " + name + " to finish...");
+      } catch (InterruptedException ie) {
+        LOG.debug("Interrupted waiting for " + name + " to finish...");
       }
     }
   }
 
+  void join() {
+    waitFor(splits, "Split Thread");
+    waitFor(largeCompactions, "Large Compaction Thread");
+    if (smallCompactions != null) {
+      waitFor(smallCompactions, "Small Compaction Thread");
+    }
+  }
+
   /**
    * Returns the current size of the queue containing regions that are
    * processed.
@@ -243,7 +218,10 @@ public class CompactSplitThread extends 
    * @return The current size of the regions queue.
    */
   public int getCompactionQueueSize() {
-    return compactionQueue.size();
+    int size = largeCompactions.getQueue().size();
+    if (smallCompactions != null)
+      size += smallCompactions.getQueue().size();
+    return size;
   }
 
   private boolean shouldSplitRegion() {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1101677&r1=1101676&r2=1101677&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue May 10
23:21:04 2011
@@ -890,7 +890,8 @@ public class HRegion implements HeapSize
             return false;
           }
         }
-        LOG.info("Starting compaction on region " + this);
+        LOG.info("Starting compaction on " + cr.getStore() + " in region "
+            + this);
         doRegionCompactionPrep();
         try {
           status.setStatus("Compacting store " + cr.getStore());
@@ -3707,6 +3708,20 @@ public class HRegion implements HeapSize
     // nothing
   }
 
+  byte[] checkSplit() {
+    if (this.splitPoint != null) {
+      return this.splitPoint;
+    }
+    byte[] splitPoint = null;
+    for (Store s : stores.values()) {
+      splitPoint = s.checkSplit();
+      if (splitPoint != null) {
+        return splitPoint;
+      }
+    }
+    return null;
+  }
+
   /**
    * @return The priority that this region should have in the compaction queue
    */

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1101677&r1=1101676&r2=1101677&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue
May 10 23:21:04 2011
@@ -226,7 +226,7 @@ public class HRegionServer implements HR
   private RegionServerMetrics metrics;
 
   // Compactions
-  CompactSplitThread compactSplitThread;
+  public CompactSplitThread compactSplitThread;
 
   // Cache flushing
   MemStoreFlusher cacheFlusher;
@@ -1017,7 +1017,7 @@ public class HRegionServer implements HR
    *
    * @return false if file system is not available
    */
-  protected boolean checkFileSystem() {
+  public boolean checkFileSystem() {
     if (this.fsOk && this.fs != null) {
       try {
         FSUtils.checkFileSystemAvailable(this.fs);
@@ -1247,8 +1247,6 @@ public class HRegionServer implements HR
     Threads.setDaemonThreadRunning(this.hlogRoller, n + ".logRoller", handler);
     Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher",
       handler);
-    Threads.setDaemonThreadRunning(this.compactSplitThread, n + ".compactor",
-      handler);
     Threads.setDaemonThreadRunning(this.majorCompactionChecker, n +
       ".majorCompactionChecker", handler);
 
@@ -1316,7 +1314,7 @@ public class HRegionServer implements HR
       return false;
     }
     // Verify that all threads are alive
-    if (!(leases.isAlive() && compactSplitThread.isAlive()
+    if (!(leases.isAlive()
         && cacheFlusher.isAlive() && hlogRoller.isAlive()
         && this.majorCompactionChecker.isAlive())) {
       stop("One or more threads are no longer alive -- stop");
@@ -1434,8 +1432,10 @@ public class HRegionServer implements HR
   protected void join() {
     Threads.shutdown(this.majorCompactionChecker);
     Threads.shutdown(this.cacheFlusher);
-    Threads.shutdown(this.compactSplitThread);
     Threads.shutdown(this.hlogRoller);
+    if (this.compactSplitThread != null) {
+      this.compactSplitThread.join();
+    }
     if (this.service != null) this.service.shutdown();
     if (this.replicationHandler != null) {
       this.replicationHandler.join();
@@ -2338,11 +2338,7 @@ public class HRegionServer implements HR
     HRegion region = getRegion(regionInfo.getRegionName());
     region.flushcache();
     region.forceSplit(splitPoint);
-    // force a compaction, split will be side-effect
-    // TODO: flush/compact/split refactor will make it trivial to do this
-    // sync/async (and won't require us to do a compaction to split!)
-    compactSplitThread.requestCompaction(region, "User-triggered split",
-        CompactSplitThread.PRIORITY_USER);
+    compactSplitThread.requestSplit(region, region.checkSplit());
   }
 
   @Override

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1101677&r1=1101676&r2=1101677&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java Tue
May 10 23:21:04 2011
@@ -354,7 +354,9 @@ class MemStoreFlusher extends Thread imp
           LOG.warn("Region " + region.getRegionNameAsString() + " has too many " +
             "store files; delaying flush up to " + this.blockingWaitTime + "ms");
         }
-        this.server.compactSplitThread.requestCompaction(region, getName());
+        if (!this.server.compactSplitThread.requestSplit(region)) {
+          this.server.compactSplitThread.requestCompaction(region, getName());
+        }
         // Put back on the queue.  Have it come back out of the queue
         // after a delay of this.blockingWaitTime / 100 ms.
         this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java?rev=1101677&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java Tue May
10 23:21:04 2011
@@ -0,0 +1,76 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Handles processing region splits. Put in a queue, owned by HRegionServer.
+ */
+class SplitRequest implements Runnable {
+  static final Log LOG = LogFactory.getLog(SplitRequest.class);
+  private final HRegion parent;
+  private final byte[] midKey;
+  private final HRegionServer server;
+
+  SplitRequest(HRegion region, byte[] midKey, HRegionServer hrs) {
+    Preconditions.checkNotNull(hrs);
+    this.parent = region;
+    this.midKey = midKey;
+    this.server = hrs;
+  }
+
+  @Override
+  public String toString() {
+    return "regionName=" + parent + ", midKey=" + Bytes.toStringBinary(midKey);
+  }
+
+  @Override
+  public void run() {
+    try {
+      final long startTime = System.currentTimeMillis();
+      SplitTransaction st = new SplitTransaction(parent, midKey);
+      // If prepare does not return true, for some reason -- logged inside in
+      // the prepare call -- we are not ready to split just now. Just return.
+      if (!st.prepare()) return;
+      try {
+        st.execute(this.server, this.server);
+      } catch (Exception e) {
+        try {
+          LOG.info("Running rollback of failed split of " + parent + "; "
+              + e.getMessage());
+          st.rollback(this.server, this.server);
+          LOG.info("Successful rollback of failed split of " + parent);
+        } catch (RuntimeException ee) {
+          // If failed rollback, kill server to avoid having a hole in table.
+          LOG.info("Failed rollback of failed split of "
+              + parent.getRegionNameAsString() + " -- aborting server", ee);
+          this.server.abort("Failed split");
+        }
+        return;
+      }
+      LOG.info("Region split, META updated, and report to master. Parent="
+          + parent.getRegionInfo().getRegionNameAsString() + ", new regions: "
+          + st.getFirstDaughter().getRegionNameAsString() + ", "
+          + st.getSecondDaughter().getRegionNameAsString() + ". Split took "
+          + StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
+    } catch (IOException ex) {
+      LOG.error("Split failed " + this, RemoteExceptionHandler
+          .checkIOException(ex));
+      server.checkFileSystem();
+    }
+  }
+
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1101677&r1=1101676&r2=1101677&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue May 10 23:21:04
2011
@@ -695,7 +695,7 @@ public class Store implements HeapSize {
           StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
           int idx = filesToCompact.indexOf(last);
           Preconditions.checkArgument(idx != -1);
-          filesToCompact = filesToCompact.subList(idx+1, filesToCompact.size());
+          filesToCompact.subList(0, idx + 1).clear();
         }
         int count = filesToCompact.size();
         if (N > count) {
@@ -868,7 +868,7 @@ public class Store implements HeapSize {
           StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
           int idx = candidates.indexOf(last);
           Preconditions.checkArgument(idx != -1);
-          candidates = candidates.subList(idx + 1, candidates.size());
+          candidates.subList(0, idx + 1).clear();
         }
         List<StoreFile> filesToCompact = compactSelection(candidates);
 
@@ -974,6 +974,11 @@ public class Store implements HeapSize {
       int start = 0;
       double r = this.compactRatio;
 
+      // skip selection algorithm if we don't have enough files
+      if (filesToCompact.size() < this.minFilesToCompact) {
+        return Collections.emptyList();
+      }
+
       /* TODO: add sorting + unit test back in when HBASE-2856 is fixed
       // Sort files by size to correct when normal skew is altered by bulk load.
       Collections.sort(filesToCompact, StoreFile.Comparators.FILE_SIZE);
@@ -1390,7 +1395,7 @@ public class Store implements HeapSize {
    * Determines if Store should be split
    * @return byte[] if store should be split, null otherwise.
    */
-  byte[] checkSplit() {
+  public byte[] checkSplit() {
     this.lock.readLock().lock();
     try {
       boolean force = this.region.shouldForceSplit();

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java?rev=1101677&r1=1101676&r2=1101677&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
Tue May 10 23:21:04 2011
@@ -19,20 +19,33 @@
  */
 package org.apache.hadoop.hbase.regionserver.compactions;
 
+import java.io.IOException;
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.StringUtils;
 
-  /**
-   * This class represents a compaction request and holds the region, priority,
-   * and time submitted.
-   */
-  public class CompactionRequest implements Comparable<CompactionRequest> {
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+
+/**
+ * This class holds all details necessary to run a compaction.
+ */
+public class CompactionRequest implements Comparable<CompactionRequest>,
+    Runnable {
     static final Log LOG = LogFactory.getLog(CompactionRequest.class);
     private final HRegion r;
     private final Store s;
@@ -41,20 +54,12 @@ import org.apache.hadoop.hbase.regionser
     private final boolean isMajor;
     private int p;
     private final Date date;
-
-    public CompactionRequest(HRegion r, Store s) {
-      this(r, s, null, false, s.getCompactPriority());
-    }
-
-    public CompactionRequest(HRegion r, Store s, int p) {
-      this(r, s, null, false, p);
-    }
+    private HRegionServer server = null;
 
     public CompactionRequest(HRegion r, Store s,
         List<StoreFile> files, boolean isMajor, int p) {
-      if (r == null) {
-        throw new NullPointerException("HRegion cannot be null");
-      }
+      Preconditions.checkNotNull(r);
+      Preconditions.checkNotNull(files);
 
       this.r = r;
       this.s = s;
@@ -136,10 +141,77 @@ import org.apache.hadoop.hbase.regionser
       this.p = p;
     }
 
+    public void setServer(HRegionServer hrs) {
+      this.server = hrs;
+    }
+
+    @Override
     public String toString() {
+      String fsList = Joiner.on(", ").join(
+          Collections2.transform(Collections2.filter(files,
+              new Predicate<StoreFile>() {
+                public boolean apply(StoreFile sf) {
+                  return sf.getReader() != null;
+                }
+            }), new Function<StoreFile, String>() {
+              public String apply(StoreFile sf) {
+                return StringUtils.humanReadableInt(sf.getReader().length());
+              }
+            }));
+
       return "regionName=" + r.getRegionNameAsString() +
         ", storeName=" + new String(s.getFamily().getName()) +
         ", fileCount=" + files.size() +
+        ", fileSize=" + StringUtils.humanReadableInt(totalSize) +
+          ((fsList.isEmpty()) ? "" : " (" + fsList + ")") +
         ", priority=" + p + ", date=" + date;
     }
+
+    @Override
+    public void run() {
+      Preconditions.checkNotNull(server);
+      if (server.isStopped()) {
+        return;
+      }
+      try {
+        long start = EnvironmentEdgeManager.currentTimeMillis();
+        boolean completed = r.compact(this);
+        long now = EnvironmentEdgeManager.currentTimeMillis();
+        LOG.info(((completed) ? "completed" : "aborted") + " compaction: " +
+              this + "; duration=" + StringUtils.formatTimeDiff(now, start));
+        if (completed) {
+          server.getMetrics().addCompaction(now - start, this.totalSize);
+          // degenerate case: blocked regions require recursive enqueues
+          if (s.getCompactPriority() <= 0) {
+            server.compactSplitThread
+              .requestCompaction(r, s, "Recursive enqueue");
+          }
+        }
+      } catch (IOException ex) {
+        LOG.error("Compaction failed " + this, RemoteExceptionHandler
+            .checkIOException(ex));
+        server.checkFileSystem();
+      } catch (Exception ex) {
+        LOG.error("Compaction failed " + this, ex);
+        server.checkFileSystem();
+      } finally {
+        s.finishRequest(this);
+        LOG.debug("CompactSplitThread Status: " + server.compactSplitThread);
+      }
+    }
+
+    /**
+     * Cleanup class to use when rejecting a compaction request from the queue.
+     */
+    public static class Rejection implements RejectedExecutionHandler {
+
+      @Override
+      public void rejectedExecution(Runnable request, ThreadPoolExecutor pool) {
+        if (request instanceof CompactionRequest) {
+          CompactionRequest cr = (CompactionRequest) request;
+          LOG.debug("Compaction Rejected: " + cr);
+          cr.getStore().finishRequest(cr);
+        }
+      }
+    }
   }



Mime
View raw message