hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nspiegelb...@apache.org
Subject svn commit: r1101676 - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/compactions/ main/java/org/apache/hadoop/hbase/regionserver/metrics/ test/java/org/apache/hadoop/hbase/regionserver/
Date Tue, 10 May 2011 23:20:46 GMT
Author: nspiegelberg
Date: Tue May 10 23:20:45 2011
New Revision: 1101676

URL: http://svn.apache.org/viewvc?rev=1101676&view=rev
Log:
HBASE-3797 StoreFile Level Compaction Locking

Removed:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java
Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=1101676&r1=1101675&r2=1101676&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java Tue May 10 23:20:45 2011
@@ -20,6 +20,8 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -28,6 +30,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.util.StringUtils;
 
@@ -42,13 +45,14 @@ public class CompactSplitThread extends 
   private final HRegionServer server;
   private final Configuration conf;
 
-  private final PriorityCompactionQueue compactionQueue =
-    new PriorityCompactionQueue();
+  protected final BlockingQueue<CompactionRequest> compactionQueue =
+    new PriorityBlockingQueue<CompactionRequest>();
 
   /* The default priority for user-specified compaction requests.
    * The user gets top priority unless we have blocking compactions. (Pri <= 0)
    */
   public static final int PRIORITY_USER = 1;
+  public static final int NO_PRIORITY = Integer.MIN_VALUE;
 
   /**
    * Splitting should not take place if the total number of regions exceed this.
@@ -74,21 +78,36 @@ public class CompactSplitThread extends 
     while (!this.server.isStopped()) {
       CompactionRequest compactionRequest = null;
       HRegion r = null;
+      boolean completed = false;
       try {
         compactionRequest = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
         if (compactionRequest != null) {
+          r = compactionRequest.getHRegion();
           lock.lock();
           try {
+            // look for a split first
             if(!this.server.isStopped()) {
-              // Don't interrupt us while we are working
-              r = compactionRequest.getHRegion();
-              byte [] midKey = r.compactStore(compactionRequest.getStore());
-              if (r.getLastCompactInfo() != null) {  // compaction aborted?
-                this.server.getMetrics().addCompaction(r.getLastCompactInfo());
+              // don't split regions that are blocking
+              if (shouldSplitRegion() && r.getCompactPriority() >= PRIORITY_USER) {
+                byte[] midkey = compactionRequest.getStore().checkSplit();
+                if (midkey != null) {
+                  split(r, midkey);
+                  continue;
+                }
               }
-              if (shouldSplitRegion() && midKey != null &&
-                  !this.server.isStopped()) {
-                split(r, midKey);
+            }
+
+            // now test for compaction
+            if(!this.server.isStopped()) {
+              long startTime = EnvironmentEdgeManager.currentTimeMillis();
+              completed = r.compact(compactionRequest);
+              long now = EnvironmentEdgeManager.currentTimeMillis();
+              LOG.info(((completed) ? "completed" : "aborted")
+                  + " compaction: " + compactionRequest + ", duration="
+                  + StringUtils.formatTimeDiff(now, startTime));
+              if (completed) { // compaction aborted?
+                this.server.getMetrics().
+                  addCompaction(now - startTime, compactionRequest.getSize());
               }
             }
           } finally {
@@ -98,19 +117,26 @@ public class CompactSplitThread extends 
       } catch (InterruptedException ex) {
         continue;
       } catch (IOException ex) {
-        LOG.error("Compaction/Split failed for region " +
-            r.getRegionNameAsString(),
+        LOG.error("Compaction/Split failed " + compactionRequest,
           RemoteExceptionHandler.checkIOException(ex));
         if (!server.checkFileSystem()) {
           break;
         }
       } catch (Exception ex) {
-        LOG.error("Compaction failed" +
-            (r != null ? (" for region " + r.getRegionNameAsString()) : ""),
-            ex);
+        LOG.error("Compaction failed " + compactionRequest, ex);
         if (!server.checkFileSystem()) {
           break;
         }
+      } finally {
+        if (compactionRequest != null) {
+          Store s = compactionRequest.getStore();
+          s.finishRequest(compactionRequest);
+          // degenerate case: blocked regions require recursive enqueues
+          if (s.getCompactPriority() < PRIORITY_USER && completed) {
+            requestCompaction(r, s, "Recursive enqueue");
+          }
+        }
+        compactionRequest = null;
       }
     }
     compactionQueue.clear();
@@ -120,19 +146,19 @@ public class CompactSplitThread extends 
   public synchronized void requestCompaction(final HRegion r,
       final String why) {
     for(Store s : r.getStores().values()) {
-      requestCompaction(r, s, false, why, s.getCompactPriority());
+      requestCompaction(r, s, why, NO_PRIORITY);
     }
   }
 
-  public synchronized void requestCompaction(final HRegion r,
-      final String why, int p) {
-    requestCompaction(r, false, why, p);
+  public synchronized void requestCompaction(final HRegion r, final Store s,
+      final String why) {
+    requestCompaction(r, s, why, NO_PRIORITY);
   }
 
-  public synchronized void requestCompaction(final HRegion r,
-      final boolean force, final String why, int p) {
+  public synchronized void requestCompaction(final HRegion r, final String why,
+      int p) {
     for(Store s : r.getStores().values()) {
-      requestCompaction(r, s, force, why, p);
+      requestCompaction(r, s, why, p);
     }
   }
 
@@ -140,24 +166,28 @@ public class CompactSplitThread extends 
    * @param r HRegion store belongs to
    * @param force Whether next compaction should be major
    * @param why Why compaction requested -- used in debug messages
+   * @param priority override the default priority (NO_PRIORITY == decide)
    */
   public synchronized void requestCompaction(final HRegion r, final Store s,
-      final boolean force, final String why, int priority) {
+      final String why, int priority) {
     if (this.server.isStopped()) {
       return;
     }
-    // tell the region to major-compact (and don't downgrade it)
-    if (force) {
-      s.setForceMajorCompaction(force);
-    }
-    CompactionRequest compactionRequest = new CompactionRequest(r, s, priority);
-    if (compactionQueue.add(compactionRequest) && LOG.isDebugEnabled()) {
-      LOG.debug("Compaction " + (force? "(major) ": "") +
-        "requested for region " + r.getRegionNameAsString() +
-        "/" + r.getRegionInfo().getEncodedName() +
-        ", store " + s +
-        (why != null && !why.isEmpty()? " because " + why: "") +
-        "; priority=" + priority + ", compaction queue size=" + compactionQueue.size());
+    CompactionRequest cr = s.requestCompaction();
+    if (cr != null) {
+      if (priority != NO_PRIORITY) {
+        cr.setPriority(priority);
+      }
+      boolean addedToQueue = compactionQueue.add(cr);
+      if (!addedToQueue) {
+        LOG.error("Could not add request to compaction queue: " + cr);
+        s.finishRequest(cr);
+      } else if (LOG.isDebugEnabled()) {
+        LOG.debug("Compaction requested: " + cr
+            + (why != null && !why.isEmpty() ? "; Because: " + why : "")
+            + "; Priority: " + priority + "; Compaction queue size: "
+            + compactionQueue.size());
+      }
     }
   }
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java?rev=1101676&r1=1101675&r2=1101676&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java Tue May 10 23:20:45 2011
@@ -28,8 +28,25 @@ public interface CompactionRequestor {
 
   /**
    * @param r Region to compact
+   * @param s Store within region to compact
+   * @param why Why compaction was requested -- used in debug messages
+   */
+  public void requestCompaction(final HRegion r, final Store s, final String why);
+
+  /**
+   * @param r Region to compact
    * @param why Why compaction was requested -- used in debug messages
    * @param pri Priority of this compaction. minHeap. <=0 is critical
    */
   public void requestCompaction(final HRegion r, final String why, int pri);
+
+  /**
+   * @param r Region to compact
+   * @param s Store within region to compact
+   * @param why Why compaction was requested -- used in debug messages
+   * @param pri Priority of this compaction. minHeap. <=0 is critical
+   */
+  public void requestCompaction(final HRegion r, final Store s,
+      final String why, int pri);
+
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1101676&r1=1101675&r2=1101676&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue May 10 23:20:45 2011
@@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.ipc.Copro
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -101,6 +102,7 @@ import org.apache.hadoop.util.StringUtil
 
 import org.cliffc.high_scale_lib.Counter;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ClassToInstanceMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.MutableClassToInstanceMap;
@@ -206,8 +208,8 @@ public class HRegion implements HeapSize
     volatile boolean flushing = false;
     // Set when a flush has been requested.
     volatile boolean flushRequested = false;
-    // Set while a compaction is running.
-    volatile boolean compacting = false;
+    // Number of compactions running.
+    volatile int compacting = 0;
     // Gets set in close. If set, cannot compact or flush again.
     volatile boolean writesEnabled = true;
     // Set if region is read-only
@@ -395,7 +397,7 @@ public class HRegion implements HeapSize
 
     this.writestate.setReadOnly(this.regionInfo.getTableDesc().isReadOnly());
 
-    this.writestate.compacting = false;
+    this.writestate.compacting = 0;
     this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
     // Use maximum of log sequenceid or that which was found in stores
     // (particularly if no recovered edits, seqid will be -1).
@@ -606,12 +608,10 @@ public class HRegion implements HeapSize
       writestate.writesEnabled = false;
       wasFlushing = writestate.flushing;
       LOG.debug("Closing " + this + ": disabling compactions & flushes");
-      while (writestate.compacting || writestate.flushing) {
-        LOG.debug("waiting for" +
-          (writestate.compacting ? " compaction" : "") +
-          (writestate.flushing ?
-            (writestate.compacting ? "," : "") + " cache flush" :
-              "") + " to complete for region " + this);
+      while (writestate.compacting > 0 || writestate.flushing) {
+        LOG.debug("waiting for " + writestate.compacting + " compactions" +
+            (writestate.flushing ? " & cache flush" : "") +
+            " to complete for region " + this);
         try {
           writestate.wait();
         } catch (InterruptedException iex) {
@@ -734,11 +734,6 @@ public class HRegion implements HeapSize
     return this.fs;
   }
 
-  /** @return info about the last compaction <time, size> */
-  public Pair<Long,Long> getLastCompactInfo() {
-    return this.lastCompactInfo;
-  }
-
   /** @return the last time the region was flushed */
   public long getLastFlushTime() {
     return this.lastFlushTime;
@@ -794,9 +789,9 @@ public class HRegion implements HeapSize
     return new Path(getRegionDir(), ".tmp");
   }
 
-  void setForceMajorCompaction(final boolean b) {
+  void triggerMajorCompaction() {
     for (Store h: stores.values()) {
-      h.setForceMajorCompaction(b);
+      h.triggerMajorCompaction();
     }
   }
 
@@ -817,7 +812,9 @@ public class HRegion implements HeapSize
    */
   byte [] compactStores(final boolean majorCompaction)
   throws IOException {
-    this.setForceMajorCompaction(majorCompaction);
+    if (majorCompaction) {
+      this.triggerMajorCompaction();
+    }
     return compactStores();
   }
 
@@ -826,13 +823,21 @@ public class HRegion implements HeapSize
    * to be split.
    */
   public byte[] compactStores() throws IOException {
-    byte[] splitRow = null;
     for(Store s : getStores().values()) {
-      if(splitRow == null) {
-        splitRow = compactStore(s);
+      CompactionRequest cr = s.requestCompaction();
+      if(cr != null) {
+        try {
+          compact(cr);
+        } finally {
+          s.finishRequest(cr);
+        }
+      }
+      byte[] splitRow = s.checkSplit();
+      if (splitRow != null) {
+        return splitRow;
       }
     }
-    return splitRow;
+    return null;
   }
 
   /*
@@ -846,93 +851,76 @@ public class HRegion implements HeapSize
    * conflicts with a region split, and that cannot happen because the region
    * server does them sequentially and not in parallel.
    *
-   * @return split row if split is needed
+   * @param cr Compaction details, obtained by requestCompaction()
+   * @return whether the compaction completed
    * @throws IOException e
    */
-  public byte [] compactStore(Store store) throws IOException {
-    if (this.closing.get()) {
-      LOG.debug("Skipping compaction on " + this + " because closing");
-      return null;
+  public boolean compact(CompactionRequest cr)
+  throws IOException {
+    if (cr == null) {
+      return false;
     }
+    if (this.closing.get() || this.closed.get()) {
+      LOG.debug("Skipping compaction on " + this + " because closing/closed");
+      return false;
+    }
+    Preconditions.checkArgument(cr.getHRegion().equals(this));
     lock.readLock().lock();
-    this.lastCompactInfo = null;
-    byte [] splitRow = null;
     MonitoredTask status = TaskMonitor.get().createStatus(
-        "Compacting stores in " + this);
+        "Compacting " + cr.getStore() + " in " + this);
     try {
       if (this.closed.get()) {
         LOG.debug("Skipping compaction on " + this + " because closed");
-        return null;
-      }
-      if (this.closed.get()) {
-        return splitRow;
+        return false;
       }
       if (coprocessorHost != null) {
         status.setStatus("Running coprocessor preCompact hooks");
         coprocessorHost.preCompact(false);
       }
+      boolean decr = true;
       try {
         synchronized (writestate) {
-          if (!writestate.compacting && writestate.writesEnabled) {
-            writestate.compacting = true;
+          if (writestate.writesEnabled) {
+            ++writestate.compacting;
           } else {
-            String msg = "NOT compacting region " + this +
-                ": compacting=" + writestate.compacting + ", writesEnabled=" +
-                writestate.writesEnabled;
+            String msg = "NOT compacting region " + this + ". Writes disabled.";
             LOG.info(msg);
             status.abort(msg);
-            return splitRow;
+            decr = false;
+            return false;
           }
         }
         LOG.info("Starting compaction on region " + this);
-        long startTime = EnvironmentEdgeManager.currentTimeMillis();
         doRegionCompactionPrep();
-        long lastCompactSize = 0;
-        boolean completed = false;
         try {
-          status.setStatus("Compacting store " + store);
-          final Store.StoreSize ss = store.compact();
-          lastCompactSize += store.getLastCompactSize();
-          if (ss != null) {
-            splitRow = ss.getSplitRow();
-          }
-          completed = true;
+          status.setStatus("Compacting store " + cr.getStore());
+          cr.getStore().compact(cr);
         } catch (InterruptedIOException iioe) {
-          LOG.info("compaction interrupted by user: ", iioe);
-        } finally {
-          long now = EnvironmentEdgeManager.currentTimeMillis();
-          LOG.info(((completed) ? "completed" : "aborted")
-              + " compaction on region " + this
-              + " after " + StringUtils.formatTimeDiff(now, startTime));
-          if (completed) {
-            this.lastCompactInfo =
-              new Pair<Long,Long>((now - startTime) / 1000, lastCompactSize);
-            status.setStatus("Compaction complete: " +
-                StringUtils.humanReadableInt(lastCompactSize) + " in " +
-                (now - startTime) + "ms");
-          }
+          String msg = "compaction interrupted by user";
+          LOG.info(msg, iioe);
+          status.abort(msg);
+          return false;
         }
       } finally {
-        synchronized (writestate) {
-          writestate.compacting = false;
-          writestate.notifyAll();
+        if (decr) {
+          synchronized (writestate) {
+            --writestate.compacting;
+            if (writestate.compacting <= 0) {
+              writestate.notifyAll();
+            }
+          }
         }
       }
       if (coprocessorHost != null) {
         status.setStatus("Running coprocessor post-compact hooks");
-        coprocessorHost.postCompact(splitRow != null);
+        coprocessorHost.postCompact(false);
       }
-      
       status.markComplete("Compaction complete");
+      return true;
     } finally {
       status.cleanup();
       lock.readLock().unlock();
     }
-    if (splitRow != null) {
-      assert splitPoint == null || Bytes.equals(splitRow, splitPoint);
-      this.splitPoint = null; // clear the split point (if set)
-    }
-    return splitRow;
   }
 
   /**
@@ -3708,6 +3696,10 @@ public class HRegion implements HeapSize
     }
   }
 
+  void clearSplit_TESTS_ONLY() {
+    this.splitRequest = false;
+  }
+
   /**
    * Give the region a chance to prepare before it is split.
    */
@@ -3731,9 +3723,9 @@ public class HRegion implements HeapSize
    * store files
    * @return true if any store has too many store files
    */
-  public boolean hasTooManyStoreFiles() {
+  public boolean needsCompaction() {
     for(Store store : stores.values()) {
-      if(store.hasTooManyStoreFiles()) {
+      if(store.needsCompaction()) {
         return true;
       }
     }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1101676&r1=1101675&r2=1101676&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue May 10 23:20:45 2011
@@ -1046,14 +1046,18 @@ public class HRegionServer implements HR
     @Override
     protected void chore() {
       for (HRegion r : this.instance.onlineRegions.values()) {
-        try {
-          if (r != null && r.isMajorCompaction()) {
-            // Queue a compaction. Will recognize if major is needed.
-            this.instance.compactSplitThread.requestCompaction(r, getName()
-              + " requests major compaction");
+        if (r == null)
+          continue;
+        for (Store s : r.getStores().values()) {
+          try {
+            if (s.isMajorCompaction()) {
+              // Queue a compaction. Will recognize if major is needed.
+              this.instance.compactSplitThread.requestCompaction(r, s,
+                  getName() + " requests major compaction");
+            }
+          } catch (IOException e) {
+            LOG.warn("Failed major compaction check on " + r, e);
           }
-        } catch (IOException e) {
-          LOG.warn("Failed major compaction check on " + r, e);
         }
       }
     }
@@ -1346,10 +1350,10 @@ public class HRegionServer implements HR
       final boolean daughter)
   throws KeeperException, IOException {
     // Do checks to see if we need to compact (references or too many files)
-    if (r.hasReferences() || r.hasTooManyStoreFiles()) {
-      getCompactionRequester().requestCompaction(r,
-        r.hasReferences()? "Region has references on open" :
-          "Region has too many store files");
+    for (Store s : r.getStores().values()) {
+      if (s.hasReferences() || s.needsCompaction()) {
+        getCompactionRequester().requestCompaction(r, s, "Opening Region");
+      }
     }
 
     // Add to online regions if all above was successful.
@@ -2346,7 +2350,10 @@ public class HRegionServer implements HR
   public void compactRegion(HRegionInfo regionInfo, boolean major)
       throws NotServingRegionException, IOException {
     HRegion region = getRegion(regionInfo.getRegionName());
-    compactSplitThread.requestCompaction(region, major, "User-triggered "
+    if (major) {
+      region.triggerMajorCompaction();
+    }
+    compactSplitThread.requestCompaction(region, "User-triggered "
         + (major ? "major " : "") + "compaction",
         CompactSplitThread.PRIORITY_USER);
   }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1101676&r1=1101675&r2=1101676&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue May 10 23:20:45 2011
@@ -24,8 +24,10 @@ import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.NavigableSet;
+import java.util.Set;
 import java.util.SortedSet;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -48,16 +50,20 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 /**
  * A Store holds a column family in a Region.  Its a memstore and a set of zero
@@ -102,7 +108,7 @@ public class Store implements HeapSize {
   // With float, java will downcast your long to float for comparisons (bad)
   private double compactRatio;
   private long lastCompactSize = 0;
-  private volatile boolean forceMajor = false;
+  volatile boolean forceMajor = false;
   /* how many bytes to write between status checks */
   static int closeCheckInterval = 0;
   private final long desiredMaxFileSize;
@@ -119,12 +125,12 @@ public class Store implements HeapSize {
    */
   private ImmutableList<StoreFile> storefiles = null;
 
+  List<StoreFile> filesCompacting = Lists.newArrayList();
 
   // All access must be synchronized.
   private final CopyOnWriteArraySet<ChangedReadersObserver> changedReaderObservers =
     new CopyOnWriteArraySet<ChangedReadersObserver>();
 
-  private final Object compactLock = new Object();
   private final int blocksize;
   private final boolean blockcache;
   /** Compression algorithm for flush files and minor compaction */
@@ -569,7 +575,7 @@ public class Store implements HeapSize {
       // Tell listeners of the change in readers.
       notifyChangedReadersObservers();
 
-      return this.storefiles.size() >= this.minFilesToCompact;
+      return needsCompaction();
     } finally {
       this.lock.writeLock().unlock();
     }
@@ -620,99 +626,108 @@ public class Store implements HeapSize {
    * <p>We don't want to hold the structureLock for the whole time, as a compact()
    * can be lengthy and we want to allow cache-flushes during this period.
    *
-   * @return row to split around if a split is needed, null otherwise
+   * @param CompactionRequest
+   *          compaction details obtained from requestCompaction()
    * @throws IOException
    */
-  StoreSize compact() throws IOException {
-    boolean forceSplit = this.region.shouldForceSplit();
-    synchronized (compactLock) {
-      this.lastCompactSize = 0; // reset first in case compaction is aborted
+  void compact(CompactionRequest cr) throws IOException {
+    if (cr == null || cr.getFiles().isEmpty()) {
+      return;
+    }
+    Preconditions.checkArgument(cr.getStore().toString()
+        .equals(this.toString()));
 
-      // sanity checks
-      for (StoreFile sf : this.storefiles) {
-        if (sf.getPath() == null || sf.getReader() == null) {
-          boolean np = sf.getPath() == null;
-          LOG.debug("StoreFile " + sf + " has null " + (np ? "Path":"Reader"));
-          return null;
-        }
-      }
+    List<StoreFile> filesToCompact = cr.getFiles();
+
+    synchronized (filesCompacting) {
+      // sanity check: we're compacting files that this store knows about
+      // TODO: change this to LOG.error() after more debugging
+      Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
+    }
+
+    // Max-sequenceID is the last key in the files we're compacting
+    long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
 
-      // if the user wants to force a split, skip compaction unless necessary
-      boolean references = hasReferences(this.storefiles);
-      if (forceSplit && !this.forceMajor && !references) {
-        return checkSplit(forceSplit);
-      }
-
-      Collection<StoreFile> filesToCompact
-        = compactSelection(this.storefiles, this.forceMajor);
-
-      // empty == do not compact
-      if (filesToCompact.isEmpty()) {
-        // but do see if we need to split before returning
-        return checkSplit(forceSplit);
-      }
-
-      // sum size of all files included in compaction
-      long totalSize = 0;
-      for (StoreFile sf : filesToCompact) {
-        totalSize += sf.getReader().length();
-      }
-      this.lastCompactSize = totalSize;
-
-      // major compaction iff all StoreFiles are included
-      boolean majorcompaction
-        = (filesToCompact.size() == this.storefiles.size());
-      if (majorcompaction) {
-        this.forceMajor = false;
-      }
-
-      // Max-sequenceID is the last key in the files we're compacting
-      long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
-
-      // Ready to go.  Have list of files to compact.
-      LOG.info("Started compaction of " + filesToCompact.size() + " file(s) in cf=" +
-          this.storeNameStr +
-        (hasReferences(filesToCompact)? ", hasReferences=true,": " ") + " into " +
-          region.getTmpDir() + ", seqid=" + maxId +
-          ", totalSize=" + StringUtils.humanReadableInt(totalSize));
-      StoreFile.Writer writer
-        = compactStore(filesToCompact, majorcompaction, maxId);
+    // Ready to go. Have list of files to compact.
+    LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
+        + this.storeNameStr + " of "
+        + this.region.getRegionInfo().getRegionNameAsString()
+        + " into " + region.getTmpDir() + ", seqid=" + maxId + ", totalSize="
+        + StringUtils.humanReadableInt(cr.getSize()));
+
+    StoreFile sf = null;
+    try {
+      StoreFile.Writer writer = compactStore(filesToCompact, cr.isMajor(),
+          maxId);
       // Move the compaction into place.
-      StoreFile sf = completeCompaction(filesToCompact, writer);
-      if (LOG.isInfoEnabled()) {
-        LOG.info("Completed" + (majorcompaction? " major ": " ") +
-          "compaction of " + filesToCompact.size() +
-          " file(s), new file=" + (sf == null? "none": sf.toString()) +
-          ", size=" + (sf == null? "none": StringUtils.humanReadableInt(sf.getReader().length())) +
-          "; total size for store is " + StringUtils.humanReadableInt(storeSize));
+      sf = completeCompaction(filesToCompact, writer);
+    } finally {
+      synchronized (filesCompacting) {
+        filesCompacting.removeAll(filesToCompact);
       }
     }
-    return checkSplit(forceSplit);
+
+    LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
+        + filesToCompact.size() + " file(s) in " + this.storeNameStr + " of "
+        + this.region.getRegionInfo().getRegionNameAsString()
+        + "; new storefile name=" + (sf == null ? "none" : sf.toString())
+        + ", size=" + (sf == null ? "none" :
+          StringUtils.humanReadableInt(sf.getReader().length()))
+        + "; total size for store is "
+        + StringUtils.humanReadableInt(storeSize));
   }
 
   /*
    * Compact the most recent N files. Essentially a hook for testing.
    */
   protected void compactRecent(int N) throws IOException {
-    synchronized(compactLock) {
-      List<StoreFile> filesToCompact = this.storefiles;
-      int count = filesToCompact.size();
-      if (N > count) {
-        throw new RuntimeException("Not enough files");
+    List<StoreFile> filesToCompact;
+    long maxId;
+    boolean isMajor;
+
+    this.lock.readLock().lock();
+    try {
+      synchronized (filesCompacting) {
+        filesToCompact = Lists.newArrayList(storefiles);
+        if (!filesCompacting.isEmpty()) {
+          // exclude all files older than the newest file we're currently
+          // compacting. this allows us to preserve contiguity (HBASE-2856)
+          StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
+          int idx = filesToCompact.indexOf(last);
+          Preconditions.checkArgument(idx != -1);
+          filesToCompact = filesToCompact.subList(idx+1, filesToCompact.size());
+        }
+        int count = filesToCompact.size();
+        if (N > count) {
+          throw new RuntimeException("Not enough files");
+        }
+
+        filesToCompact = filesToCompact.subList(count - N, count);
+        maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
+        isMajor = (filesToCompact.size() == storefiles.size());
+        filesCompacting.addAll(filesToCompact);
+        Collections.sort(filesCompacting, StoreFile.Comparators.FLUSH_TIME);
       }
+    } finally {
+      this.lock.readLock().unlock();
+    }
 
-      filesToCompact = new ArrayList<StoreFile>(filesToCompact.subList(count-N, count));
-      long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
-      boolean majorcompaction = (N == count);
-
-      // Ready to go.  Have list of files to compact.
-      StoreFile.Writer writer
-        = compactStore(filesToCompact, majorcompaction, maxId);
+    try {
+      // Ready to go. Have list of files to compact.
+      StoreFile.Writer writer = compactStore(filesToCompact, isMajor, maxId);
       // Move the compaction into place.
       StoreFile sf = completeCompaction(filesToCompact, writer);
+    } finally {
+      synchronized (filesCompacting) {
+        filesCompacting.removeAll(filesToCompact);
+      }
     }
   }
 
+  boolean hasReferences() {
+    return hasReferences(this.storefiles);
+  }
+
   /*
    * @param files
    * @return True if any of the files in <code>files</code> are References.
@@ -835,6 +850,69 @@ public class Store implements HeapSize {
     return ret;
   }
 
+  public CompactionRequest requestCompaction() {
+    // don't even select for compaction if writes are disabled
+    if (!this.region.areWritesEnabled()) {
+      return null;
+    }
+
+    CompactionRequest ret = null;
+    this.lock.readLock().lock();
+    try {
+      synchronized (filesCompacting) {
+        // candidates = all storefiles not already in compaction queue
+        List<StoreFile> candidates = Lists.newArrayList(storefiles);
+        if (!filesCompacting.isEmpty()) {
+          // exclude all files older than the newest file we're currently
+          // compacting. this allows us to preserve contiguity (HBASE-2856)
+          StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
+          int idx = candidates.indexOf(last);
+          Preconditions.checkArgument(idx != -1);
+          candidates = candidates.subList(idx + 1, candidates.size());
+        }
+        List<StoreFile> filesToCompact = compactSelection(candidates);
+
+        // no files to compact
+        if (filesToCompact.isEmpty()) {
+          return null;
+        }
+
+        // basic sanity check: do not try to compact the same StoreFile twice.
+        if (!Collections.disjoint(filesCompacting, filesToCompact)) {
+          // TODO: change this from an IAE to LOG.error after sufficient testing
+          Preconditions.checkArgument(false, "%s overlaps with %s",
+              filesToCompact, filesCompacting);
+        }
+        filesCompacting.addAll(filesToCompact);
+        Collections.sort(filesCompacting, StoreFile.Comparators.FLUSH_TIME);
+
+        // major compaction iff all StoreFiles are included
+        boolean isMajor = (filesToCompact.size() == this.storefiles.size());
+        if (isMajor) {
+          // since we're enqueuing a major, update the compaction wait interval
+          this.forceMajor = false;
+          this.majorCompactionTime = getNextMajorCompactTime();
+        }
+
+        // everything went better than expected. create a compaction request
+        int pri = getCompactPriority();
+        ret = new CompactionRequest(region, this, filesToCompact, isMajor, pri);
+      }
+    } catch (IOException ex) {
+      LOG.error("Compaction Request failed for region " + region + ", store "
+          + this, RemoteExceptionHandler.checkIOException(ex));
+    } finally {
+      this.lock.readLock().unlock();
+    }
+    return ret;
+  }
+
+  public void finishRequest(CompactionRequest cr) {
+    synchronized (filesCompacting) {
+      filesCompacting.removeAll(cr.getFiles());
+    }
+  }
+
   /**
    * Algorithm to choose which files to compact
    *
@@ -851,12 +929,13 @@ public class Store implements HeapSize {
    *    max files to compact at once (avoids OOM)
    *
    * @param candidates candidate files, ordered from oldest to newest
-   * @param majorcompaction whether to force a major compaction
    * @return subset copy of candidate list that meets compaction criteria
    * @throws IOException
    */
-  List<StoreFile> compactSelection(List<StoreFile> candidates,
-      boolean forcemajor) throws IOException {
+  List<StoreFile> compactSelection(List<StoreFile> candidates)
+      throws IOException {
+    // ASSUMPTION!!! filesCompacting is locked when calling this function
+
     /* normal skew:
      *
      *         older ----> newer
@@ -870,6 +949,7 @@ public class Store implements HeapSize {
      */
     List<StoreFile> filesToCompact = new ArrayList<StoreFile>(candidates);
 
+    boolean forcemajor = this.forceMajor && filesCompacting.isEmpty();
     if (!forcemajor) {
       // do not compact old files above a configurable threshold
       // save all references. we MUST compact them
@@ -888,9 +968,6 @@ public class Store implements HeapSize {
     // major compact on user action or age (caveat: we have too many files)
     boolean majorcompaction = (forcemajor || isMajorCompaction(filesToCompact))
       && filesToCompact.size() < this.maxFilesToCompact;
-    if (majorcompaction) {
-      this.majorCompactionTime = getNextMajorCompactTime();
-    }
 
     if (!majorcompaction && !hasReferences(filesToCompact)) {
       // we're doing a minor compaction, let's see what files are applicable
@@ -1054,9 +1131,6 @@ public class Store implements HeapSize {
   }
 
   /*
-   * It's assumed that the compactLock  will be acquired prior to calling this
-   * method!  Otherwise, it is not thread-safe!
-   *
    * <p>It works by processing a compaction that's been written to disk.
    *
    * <p>It is usually invoked at the end of a compaction, but might also be
@@ -1097,18 +1171,13 @@ public class Store implements HeapSize {
     this.lock.writeLock().lock();
     try {
       try {
-        // 2. Unloading
-        // 3. Loading the new TreeMap.
         // Change this.storefiles so it reflects new state but do not
         // delete old store files until we have sent out notification of
         // change in case old files are still being accessed by outstanding
         // scanners.
-        ArrayList<StoreFile> newStoreFiles = new ArrayList<StoreFile>();
-        for (StoreFile sf : storefiles) {
-          if (!compactedFiles.contains(sf)) {
-            newStoreFiles.add(sf);
-          }
-        }
+        ArrayList<StoreFile> newStoreFiles = Lists.newArrayList(storefiles);
+        newStoreFiles.removeAll(compactedFiles);
+        filesCompacting.removeAll(compactedFiles); // safe bc: lock.writeLock()
 
         // If a StoreFile result, move it into place.  May be null.
         if (result != null) {
@@ -1318,13 +1387,13 @@ public class Store implements HeapSize {
   }
 
   /**
-   * Determines if HStore can be split
-   * @param force Whether to force a split or not.
-   * @return a StoreSize if store can be split, null otherwise.
+   * Determines if Store should be split
+   * @return byte[] if store should be split, null otherwise.
    */
-  StoreSize checkSplit(final boolean force) {
+  byte[] checkSplit() {
     this.lock.readLock().lock();
     try {
+      boolean force = this.region.shouldForceSplit();
       // sanity checks
       if (this.storefiles.isEmpty()) {
         return null;
@@ -1369,7 +1438,7 @@ public class Store implements HeapSize {
       }
       // if the user explicit set a split point, use that
       if (this.region.getSplitPoint() != null) {
-        return new StoreSize(maxSize, this.region.getSplitPoint());
+        return this.region.getSplitPoint();
       }
       StoreFile.Reader r = largestSf.getReader();
       if (r == null) {
@@ -1396,7 +1465,7 @@ public class Store implements HeapSize {
           }
           return null;
         }
-        return new StoreSize(maxSize, mk.getRow());
+        return mk.getRow();
       }
     } catch(IOException e) {
       LOG.warn("Failed getting store size for " + this.storeNameStr, e);
@@ -1416,8 +1485,8 @@ public class Store implements HeapSize {
     return storeSize;
   }
 
-  void setForceMajorCompaction(final boolean b) {
-    this.forceMajor = b;
+  void triggerMajorCompaction() {
+    this.forceMajor = true;
   }
 
   boolean getForceMajorCompaction() {
@@ -1493,28 +1562,6 @@ public class Store implements HeapSize {
     return this.blockingStoreFileCount - this.storefiles.size();
   }
 
-  /**
-   * Datastructure that holds size and row to split a file around.
-   * TODO: Take a KeyValue rather than row.
-   */
-  static class StoreSize {
-    private final long size;
-    private final byte [] row;
-
-    StoreSize(long size, byte [] row) {
-      this.size = size;
-      this.row = row;
-    }
-    /* @return the size */
-    long getSize() {
-      return size;
-    }
-
-    byte [] getSplitRow() {
-      return this.row;
-    }
-  }
-
   HRegion getHRegion() {
     return this.region;
   }
@@ -1624,8 +1671,8 @@ public class Store implements HeapSize {
    * @return true if number of store files is greater than
    *  the number defined in minFilesToCompact
    */
-  public boolean hasTooManyStoreFiles() {
-    return this.storefiles.size() > this.minFilesToCompact;
+  public boolean needsCompaction() {
+    return (storefiles.size() - filesCompacting.size()) > minFilesToCompact;
   }
 
   public static final long FIXED_OVERHEAD = ClassSize.align(

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java?rev=1101676&r1=1101675&r2=1101676&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java Tue May 10 23:20:45 2011
@@ -20,11 +20,13 @@
 package org.apache.hadoop.hbase.regionserver.compactions;
 
 import java.util.Date;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
 
   /**
    * This class represents a compaction request and holds the region, priority,
@@ -34,30 +36,37 @@ import org.apache.hadoop.hbase.regionser
     static final Log LOG = LogFactory.getLog(CompactionRequest.class);
     private final HRegion r;
     private final Store s;
+    private final List<StoreFile> files;
+    private final long totalSize;
+    private final boolean isMajor;
     private int p;
     private final Date date;
 
     public CompactionRequest(HRegion r, Store s) {
-      this(r, s, s.getCompactPriority());
+      this(r, s, null, false, s.getCompactPriority());
     }
 
     public CompactionRequest(HRegion r, Store s, int p) {
-      this(r, s, p, null);
+      this(r, s, null, false, p);
     }
 
-    public CompactionRequest(HRegion r, Store s, int p, Date d) {
+    public CompactionRequest(HRegion r, Store s,
+        List<StoreFile> files, boolean isMajor, int p) {
       if (r == null) {
         throw new NullPointerException("HRegion cannot be null");
       }
 
-      if (d == null) {
-        d = new Date();
-      }
-
       this.r = r;
       this.s = s;
+      this.files = files;
+      long sz = 0;
+      for (StoreFile sf : files) {
+        sz += sf.getReader().length();
+      }
+      this.totalSize = sz;
+      this.isMajor = isMajor;
       this.p = p;
-      this.date = d;
+      this.date = new Date();
     }
 
     /**
@@ -89,8 +98,8 @@ import org.apache.hadoop.hbase.regionser
         return compareVal;
       }
 
-      //break the tie arbitrarily
-      return -1;
+      // break the tie based on hash code
+      return this.hashCode() - request.hashCode();
     }
 
     /** Gets the HRegion for the request */
@@ -103,6 +112,20 @@ import org.apache.hadoop.hbase.regionser
       return s;
     }
 
+    /** Gets the StoreFiles for the request */
+    public List<StoreFile> getFiles() {
+      return files;
+    }
+
+    /** Gets the total size of all StoreFiles in compaction */
+    public long getSize() {
+      return totalSize;
+    }
+
+    public boolean isMajor() {
+      return this.isMajor;
+    }
+
     /** Gets the priority for the request */
     public int getPriority() {
       return p;
@@ -115,8 +138,8 @@ import org.apache.hadoop.hbase.regionser
 
     public String toString() {
       return "regionName=" + r.getRegionNameAsString() +
-        ((s == null) ? ""
-                     : "storeName = " + new String(s.getFamily().getName())) +
+        ", storeName=" + new String(s.getFamily().getName()) +
+        ", fileCount=" + files.size() +
         ", priority=" + p + ", date=" + date;
     }
   }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java?rev=1101676&r1=1101675&r2=1101676&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java Tue May 10 23:20:45 2011
@@ -314,11 +314,12 @@ public class RegionServerMetrics impleme
   }
 
   /**
-   * @param compact history in <time, size>
+   * @param time time that compaction took
+   * @param size bytesize of storefiles in the compaction
    */
-  public synchronized void addCompaction(final Pair<Long,Long> compact) {
-     this.compactionTime.inc(compact.getFirst());
-     this.compactionSize.inc(compact.getSecond());
+  public synchronized void addCompaction(long time, long size) {
+    this.compactionTime.inc(time);
+    this.compactionSize.inc(size);
   }
 
   /**

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java?rev=1101676&r1=1101675&r2=1101676&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java Tue May 10 23:20:45 2011
@@ -158,7 +158,9 @@ public class TestCompactSelection extend
   void compactEquals(List<StoreFile> candidates, boolean forcemajor, 
       long ... expected)
   throws IOException {
-    List<StoreFile> actual = store.compactSelection(candidates, forcemajor);
+    store.forceMajor = forcemajor;
+    List<StoreFile> actual = store.compactSelection(candidates);
+    store.forceMajor = false;
     assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual)));
   }
 
@@ -187,7 +189,7 @@ public class TestCompactSelection extend
      */
     // don't exceed max file compact threshold
     assertEquals(maxFiles,
-        store.compactSelection(sfCreate(7,6,5,4,3,2,1), false).size());
+        store.compactSelection(sfCreate(7,6,5,4,3,2,1)).size());
 
     /* MAJOR COMPACTION */
     // if a major compaction has been forced, then compact everything
@@ -197,8 +199,11 @@ public class TestCompactSelection extend
     // even if one of those files is too big
     compactEquals(sfCreate(tooBig, 12,12), true, tooBig, 12, 12);
     // don't exceed max file compact threshold, even with major compaction
+    store.forceMajor = true;
     assertEquals(maxFiles,
-        store.compactSelection(sfCreate(7,6,5,4,3,2,1), true).size());
+        store.compactSelection(sfCreate(7,6,5,4,3,2,1)).size());
+    store.forceMajor = false;
+
     // if we exceed maxCompactSize, downgrade to minor
     // if not, it creates a 'snowball effect' when files >> maxCompactSize:
     // the last file in compaction is the aggregate of all previous compactions
@@ -217,7 +222,7 @@ public class TestCompactSelection extend
     compactEquals(sfCreate(true, tooBig, 12,12), tooBig, 12, 12);
     // reference files should obey max file compact to avoid OOM
     assertEquals(maxFiles,
-        store.compactSelection(sfCreate(true, 7,6,5,4,3,2,1), false).size());
+        store.compactSelection(sfCreate(true, 7,6,5,4,3,2,1)).size());
 
     // empty case
     compactEquals(new ArrayList<StoreFile>() /* empty */);

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=1101676&r1=1101675&r2=1101676&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java Tue May 10 23:20:45 2011
@@ -156,7 +156,7 @@ public class TestStore extends TestCase 
     assertEquals(lowestTimeStampFromStore,lowestTimeStampFromFS);
     
     // after compact; check the lowest time stamp
-    store.compact();
+    store.compact(store.requestCompaction());
     lowestTimeStampFromStore = Store.getLowestTimestamp(store.getStorefiles());
     lowestTimeStampFromFS = getLowestTimeStampFromFS(fs,store.getStorefiles());
     assertEquals(lowestTimeStampFromStore,lowestTimeStampFromFS); 
@@ -688,7 +688,9 @@ public class TestStore extends TestCase 
    */
   public void testSplitWithEmptyColFam() throws IOException {
     init(this.getName());
-    assertNull(store.checkSplit(false));
-    assertNull(store.checkSplit(true));
+    assertNull(store.checkSplit());
+    store.getHRegion().forceSplit(null);
+    assertNull(store.checkSplit());
+    store.getHRegion().clearSplit_TESTS_ONLY();
   }
 }



Mime
View raw message