hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jxi...@apache.org
Subject svn commit: r1440737 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/compactions/ test/java/org/apache/hadoop/hbase/regionserver/
Date Wed, 30 Jan 2013 23:53:43 GMT
Author: jxiang
Date: Wed Jan 30 23:53:42 2013
New Revision: 1440737

URL: http://svn.apache.org/viewvc?rev=1440737&view=rev
Log:
HBASE-7516 Make compaction policy pluggable

Added:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java   (with props)
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java   (with props)
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java   (with props)
Removed:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java?rev=1440737&r1=1440736&r2=1440737&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java Wed Jan 30 23:53:42 2013
@@ -157,10 +157,12 @@ public class CompactionTool extends Conf
       HStore store = getStore(region, familyDir);
       do {
         CompactionRequest cr = store.requestCompaction();
-        StoreFile storeFile = store.compact(cr);
-        if (storeFile != null) {
+        List<StoreFile> storeFiles = store.compact(cr);
+        if (storeFiles != null && !storeFiles.isEmpty()) {
           if (keepCompactedFiles && deleteCompacted) {
-            fs.delete(storeFile.getPath(), false);
+            for (StoreFile storeFile: storeFiles) {
+              fs.delete(storeFile.getPath(), false);
+            }
           }
         }
       } while (store.needsCompaction() && !compactOnce);

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1440737&r1=1440736&r2=1440737&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Jan 30 23:53:42 2013
@@ -915,7 +915,7 @@ public class HRegion implements HeapSize
     return isAvailable() && !hasReferences();
   }
 
-  boolean areWritesEnabled() {
+  public boolean areWritesEnabled() {
     synchronized(this.writestate) {
       return this.writestate.writesEnabled;
     }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1440737&r1=1440736&r2=1440737&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Wed Jan 30 23:53:42 2013
@@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -130,8 +131,8 @@ public class HStore implements Store, St
   final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
   private final boolean verifyBulkLoads;
 
-  // not private for testing
-  /* package */ScanInfo scanInfo;
+  private ScanInfo scanInfo;
+
   /*
    * List of store files inside this store. This is an immutable list that
    * is atomically replaced when its contents change.
@@ -154,7 +155,7 @@ public class HStore implements Store, St
   // Comparing KeyValues
   final KeyValue.KVComparator comparator;
 
-  private final Compactor compactor;
+  private Compactor compactor;
 
   private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
   private static int flush_retries_number;
@@ -227,10 +228,7 @@ public class HStore implements Store, St
     this.checksumType = getChecksumType(conf);
     // initilize bytes per checksum
     this.bytesPerChecksum = getBytesPerChecksum(conf);
-    // Create a compaction tool instance
-    this.compactor = new Compactor(conf);
     // Create a compaction manager.
-    this.compactionPolicy = new CompactionPolicy(conf, this);
     if (HStore.flush_retries_number == 0) {
       HStore.flush_retries_number = conf.getInt(
           "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
@@ -242,6 +240,9 @@ public class HStore implements Store, St
                 + HStore.flush_retries_number);
       }
     }
+    this.compactionPolicy = CompactionPolicy.create(this, conf);
+    // Get the compaction tool instance for this policy
+    this.compactor = compactionPolicy.getCompactor();
   }
 
   /**
@@ -288,7 +289,7 @@ public class HStore implements Store, St
     return homedir;
   }
 
-  FileSystem getFileSystem() {
+  public FileSystem getFileSystem() {
     return this.fs;
   }
 
@@ -332,6 +333,13 @@ public class HStore implements Store, St
     }
   }
 
+  /**
+   * @return how many bytes to write between status checks
+   */
+  public static int getCloseCheckInterval() {
+    return closeCheckInterval;
+  }
+
   public HColumnDescriptor getFamily() {
     return this.family;
   }
@@ -933,7 +941,7 @@ public class HStore implements Store, St
    * @param isCompaction whether we are creating a new file in a compaction
    * @return Writer for a new StoreFile in the tmp dir.
    */
-  StoreFile.Writer createWriterInTmp(int maxKeyCount,
+  public StoreFile.Writer createWriterInTmp(int maxKeyCount,
     Compression.Algorithm compression, boolean isCompaction)
   throws IOException {
     final CacheConfig writerCacheConf;
@@ -1074,7 +1082,7 @@ public class HStore implements Store, St
    * @throws IOException
    * @return Storefile we compacted into or null if we failed or opted out early.
    */
-  StoreFile compact(CompactionRequest cr) throws IOException {
+  List<StoreFile> compact(CompactionRequest cr) throws IOException {
     if (cr == null || cr.getFiles().isEmpty()) return null;
     Preconditions.checkArgument(cr.getStore().toString().equals(this.toString()));
     List<StoreFile> filesToCompact = cr.getFiles();
@@ -1084,31 +1092,34 @@ public class HStore implements Store, St
       Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
     }
 
-    // Max-sequenceID is the last key in the files we're compacting
-    long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact, true);
-
     // Ready to go. Have list of files to compact.
     LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
         + this + " of " + this.region.getRegionInfo().getRegionNameAsString()
-        + " into tmpdir=" + region.getTmpDir() + ", seqid=" + maxId + ", totalSize="
+        + " into tmpdir=" + region.getTmpDir() + ", totalSize="
         + StringUtils.humanReadableInt(cr.getSize()));
 
-    StoreFile sf = null;
+    List<StoreFile> sfs = new ArrayList<StoreFile>();
     long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis();
     try {
-      StoreFile.Writer writer =
-        this.compactor.compact(this, filesToCompact, cr.isMajor(), maxId);
+      List<Path> newFiles =
+        this.compactor.compact(filesToCompact, cr.isMajor());
       // Move the compaction into place.
       if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
-        sf = completeCompaction(filesToCompact, writer);
-        if (region.getCoprocessorHost() != null) {
-          region.getCoprocessorHost().postCompact(this, sf);
+        for (Path newFile: newFiles) {
+          StoreFile sf = completeCompaction(filesToCompact, newFile);
+          if (region.getCoprocessorHost() != null) {
+            region.getCoprocessorHost().postCompact(this, sf);
+          }
+          sfs.add(sf);
         }
       } else {
-        // Create storefile around what we wrote with a reader on it.
-        sf = new StoreFile(this.fs, writer.getPath(), this.conf, this.cacheConf,
-          this.family.getBloomFilterType(), this.dataBlockEncoder);
-        sf.createReader();
+        for (Path newFile: newFiles) {
+          // Create storefile around what we wrote with a reader on it.
+          StoreFile sf = new StoreFile(this.fs, newFile, this.conf, this.cacheConf,
+            this.family.getBloomFilterType(), this.dataBlockEncoder);
+          sf.createReader();
+          sfs.add(sf);
+        }
       }
     } finally {
       synchronized (filesCompacting) {
@@ -1117,25 +1128,34 @@ public class HStore implements Store, St
     }
 
     long now = EnvironmentEdgeManager.currentTimeMillis();
-    LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
-        + filesToCompact.size() + " file(s) in " + this + " of "
-        + this.region.getRegionInfo().getRegionNameAsString()
-        + " into " +
-        (sf == null ? "none" : sf.getPath().getName()) +
-        ", size=" + (sf == null ? "none" :
-          StringUtils.humanReadableInt(sf.getReader().length()))
-        + "; total size for store is " + StringUtils.humanReadableInt(storeSize)
-        + ". This selection was in queue for "
-        + StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime())
-        + ", and took " + StringUtils.formatTimeDiff(now, compactionStartTime)
-        + " to execute.");
-    return sf;
+    StringBuilder message = new StringBuilder(
+      "Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
+      + filesToCompact.size() + " file(s) in " + this + " of "
+      + this.region.getRegionInfo().getRegionNameAsString()
+      + " into ");
+    if (sfs.isEmpty()) {
+      message.append("none, ");
+    } else {
+      for (StoreFile sf: sfs) {
+        message.append(sf.getPath().getName());
+        message.append("(size=");
+        message.append(StringUtils.humanReadableInt(sf.getReader().length()));
+        message.append("), ");
+      }
+    }
+    message.append("total size for store is ")
+      .append(StringUtils.humanReadableInt(storeSize))
+      .append(". This selection was in queue for ")
+      .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()))
+      .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime))
+      .append(" to execute.");
+    LOG.info(message.toString());
+    return sfs;
   }
 
   @Override
   public void compactRecentForTesting(int N) throws IOException {
     List<StoreFile> filesToCompact;
-    long maxId;
     boolean isMajor;
 
     this.lock.readLock().lock();
@@ -1156,7 +1176,6 @@ public class HStore implements Store, St
         }
 
         filesToCompact = filesToCompact.subList(count - N, count);
-        maxId = StoreFile.getMaxSequenceIdInList(filesToCompact, true);
         isMajor = (filesToCompact.size() == storefiles.size());
         filesCompacting.addAll(filesToCompact);
         Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
@@ -1167,12 +1186,14 @@ public class HStore implements Store, St
 
     try {
       // Ready to go. Have list of files to compact.
-      StoreFile.Writer writer =
-        this.compactor.compact(this, filesToCompact, isMajor, maxId);
-      // Move the compaction into place.
-      StoreFile sf = completeCompaction(filesToCompact, writer);
-      if (region.getCoprocessorHost() != null) {
-        region.getCoprocessorHost().postCompact(this, sf);
+      List<Path> newFiles =
+        this.compactor.compact(filesToCompact, isMajor);
+      for (Path newFile: newFiles) {
+        // Move the compaction into place.
+        StoreFile sf = completeCompaction(filesToCompact, newFile);
+        if (region.getCoprocessorHost() != null) {
+          region.getCoprocessorHost().postCompact(this, sf);
+        }
       }
     } finally {
       synchronized (filesCompacting) {
@@ -1336,26 +1357,25 @@ public class HStore implements Store, St
    * </pre>
    *
    * @param compactedFiles list of files that were compacted
-   * @param compactedFile StoreFile that is the result of the compaction
+   * @param newFile StoreFile that is the result of the compaction
    * @return StoreFile created. May be null.
    * @throws IOException
    */
   StoreFile completeCompaction(final Collection<StoreFile> compactedFiles,
-                                       final StoreFile.Writer compactedFile)
+                                       final Path newFile)
       throws IOException {
     // 1. Moving the new files into place -- if there is a new file (may not
     // be if all cells were expired or deleted).
     StoreFile result = null;
-    if (compactedFile != null) {
-      validateStoreFile(compactedFile.getPath());
+    if (newFile != null) {
+      validateStoreFile(newFile);
       // Move the file into the right spot
-      Path origPath = compactedFile.getPath();
-      Path destPath = new Path(homedir, origPath.getName());
-      LOG.info("Renaming compacted file at " + origPath + " to " + destPath);
-      if (!fs.rename(origPath, destPath)) {
-        LOG.error("Failed move of compacted file " + origPath + " to " +
+      Path destPath = new Path(homedir, newFile.getName());
+      LOG.info("Renaming compacted file at " + newFile + " to " + destPath);
+      if (!fs.rename(newFile, destPath)) {
+        LOG.error("Failed move of compacted file " + newFile + " to " +
             destPath);
-        throw new IOException("Failed move of compacted file " + origPath +
+        throw new IOException("Failed move of compacted file " + newFile +
             " to " + destPath);
       }
       result = new StoreFile(this.fs, destPath, this.conf, this.cacheConf,
@@ -1939,6 +1959,14 @@ public class HStore implements Store, St
   }
 
   /**
+   * Set scan info, used by test
+   * @param scanInfo new scan info to use for test
+   */
+  void setScanInfo(ScanInfo scanInfo) {
+    this.scanInfo = scanInfo;
+  }
+
+  /**
    * Immutable information for scans over a store.
    */
   public static class ScanInfo {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1440737&r1=1440736&r2=1440737&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Wed Jan 30 23:53:42 2013
@@ -1744,7 +1744,7 @@ public class StoreFile {
       return reader.getTrailer().getMajorVersion();
     }
 
-    HFile.Reader getHFileReader() {
+    public HFile.Reader getHFileReader() {
       return reader;
     }
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java?rev=1440737&r1=1440736&r2=1440737&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java Wed Jan 30 23:53:42 2013
@@ -19,391 +19,133 @@
 
 package org.apache.hadoop.hbase.regionserver.compactions;
 
-import com.google.common.base.Predicate;
-import com.google.common.collect.Collections2;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import java.io.IOException;
+import java.util.List;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.regionserver.StoreConfiguration;
+import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.StoreUtils;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.util.StringUtils;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.GregorianCalendar;
-import java.util.List;
-import java.util.Random;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
- * The default (and only, as of now) algorithm for selecting files for compaction.
- * Combines the compaction configuration and the provisional file selection that
- * it's given to produce the list of suitable candidates for compaction.
+ * A compaction policy determines how to select files for compaction,
+ * how to compact them, and how to generate the compacted files.
  */
 @InterfaceAudience.Private
-public class CompactionPolicy {
+public abstract class CompactionPolicy extends Configured {
+
+  /**
+   * The name of the configuration parameter that specifies
+   * the class of a compaction policy that is used to compact
+   * HBase store files.
+   */
+  public static final String COMPACTION_POLICY_KEY =
+    "hbase.hstore.compaction.policy";
 
-  private static final Log LOG = LogFactory.getLog(CompactionPolicy.class);
-  private final static Calendar calendar = new GregorianCalendar();
+  private static final Class<? extends CompactionPolicy>
+    DEFAULT_COMPACTION_POLICY_CLASS = DefaultCompactionPolicy.class;
 
   CompactionConfiguration comConf;
-  StoreConfiguration storeConfig;
-
-  public CompactionPolicy(Configuration configuration, StoreConfiguration storeConfig) {
-    updateConfiguration(configuration, storeConfig);
-  }
+  Compactor compactor;
+  HStore store;
 
   /**
    * @param candidateFiles candidate files, ordered from oldest to newest
    * @return subset copy of candidate list that meets compaction criteria
    * @throws java.io.IOException
    */
-  public CompactSelection selectCompaction(List<StoreFile> candidateFiles,
-      boolean isUserCompaction, boolean forceMajor)
-    throws IOException {
-    // Prelimanry compaction subject to filters
-    CompactSelection candidateSelection = new CompactSelection(candidateFiles);
-    long cfTtl = this.storeConfig.getStoreFileTtl();
-    if (!forceMajor) {
-      // If there are expired files, only select them so that compaction deletes them
-      if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) {
-        CompactSelection expiredSelection = selectExpiredStoreFiles(
-          candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl);
-        if (expiredSelection != null) {
-          return expiredSelection;
-        }
-      }
-      candidateSelection = skipLargeFiles(candidateSelection);
-    }
-
-    // Force a major compaction if this is a user-requested major compaction,
-    // or if we do not have too many files to compact and this was requested
-    // as a major compaction.
-    // Or, if there are any references among the candidates.
-    boolean majorCompaction = (
-      (forceMajor && isUserCompaction)
-      || ((forceMajor || isMajorCompaction(candidateSelection.getFilesToCompact()))
-          && (candidateSelection.getFilesToCompact().size() < comConf.getMaxFilesToCompact()))
-      || StoreUtils.hasReferences(candidateSelection.getFilesToCompact())
-      );
-
-    if (!majorCompaction) {
-      // we're doing a minor compaction, let's see what files are applicable
-      candidateSelection = filterBulk(candidateSelection);
-      candidateSelection = applyCompactionPolicy(candidateSelection);
-      candidateSelection = checkMinFilesCriteria(candidateSelection);
-    }
-    candidateSelection =
-        removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction);
-    return candidateSelection;
-  }
+  public abstract CompactSelection selectCompaction(
+    final List<StoreFile> candidateFiles, final boolean isUserCompaction,
+    final boolean forceMajor) throws IOException;
 
   /**
-   * Updates the compaction configuration. Used for tests.
-   * TODO: replace when HBASE-3909 is completed in some form.
+   * @param filesToCompact Files to compact. Can be null.
+   * @return True if we should run a major compaction.
    */
-  public void updateConfiguration(Configuration configuration,
-      StoreConfiguration storeConfig) {
-    this.comConf = new CompactionConfiguration(configuration, storeConfig);
-    this.storeConfig = storeConfig;
-  }
+  public abstract boolean isMajorCompaction(
+    final List<StoreFile> filesToCompact) throws IOException;
 
   /**
-   * Select the expired store files to compact
-   *
-   * @param candidates the initial set of storeFiles
-   * @param maxExpiredTimeStamp
-   *          The store file will be marked as expired if its max time stamp is
-   *          less than this maxExpiredTimeStamp.
-   * @return A CompactSelection contains the expired store files as
-   *         filesToCompact
+   * @param compactionSize Total size of some compaction
+   * @return whether this should be a large or small compaction
    */
-  private CompactSelection selectExpiredStoreFiles(
-      CompactSelection candidates, long maxExpiredTimeStamp) {
-    List<StoreFile> filesToCompact = candidates.getFilesToCompact();
-    if (filesToCompact == null || filesToCompact.size() == 0)
-      return null;
-    ArrayList<StoreFile> expiredStoreFiles = null;
-    boolean hasExpiredStoreFiles = false;
-    CompactSelection expiredSFSelection = null;
-
-    for (StoreFile storeFile : filesToCompact) {
-      if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) {
-        LOG.info("Deleting the expired store file by compaction: "
-            + storeFile.getPath() + " whose maxTimeStamp is "
-            + storeFile.getReader().getMaxTimestamp()
-            + " while the max expired timestamp is " + maxExpiredTimeStamp);
-        if (!hasExpiredStoreFiles) {
-          expiredStoreFiles = new ArrayList<StoreFile>();
-          hasExpiredStoreFiles = true;
-        }
-        expiredStoreFiles.add(storeFile);
-      }
-    }
-
-    if (hasExpiredStoreFiles) {
-      expiredSFSelection = new CompactSelection(expiredStoreFiles);
-    }
-    return expiredSFSelection;
-  }
+  public abstract boolean throttleCompaction(long compactionSize);
 
   /**
-   * @param candidates pre-filtrate
-   * @return filtered subset
-   * exclude all files above maxCompactSize
-   * Also save all references. We MUST compact them
+   * @param numCandidates Number of candidate store files
+   * @return whether a compactionSelection is possible
    */
-  private CompactSelection skipLargeFiles(CompactSelection candidates) {
-    int pos = 0;
-    while (pos < candidates.getFilesToCompact().size() &&
-      candidates.getFilesToCompact().get(pos).getReader().length() >
-        comConf.getMaxCompactSize() &&
-      !candidates.getFilesToCompact().get(pos).isReference()) {
-      ++pos;
-    }
-    if (pos > 0) {
-      LOG.debug("Some files are too large. Excluding " + pos
-          + " files from compaction candidates");
-      candidates.clearSubList(0, pos);
-    }
-    return candidates;
-  }
+  public abstract boolean needsCompaction(int numCandidates);
 
   /**
-   * @param candidates pre-filtrate
-   * @return filtered subset
-   * exclude all bulk load files if configured
+   * Inform the policy that some configuration has been change,
+   * so cached value should be updated it any.
    */
-  private CompactSelection filterBulk(CompactSelection candidates) {
-    candidates.getFilesToCompact().removeAll(Collections2.filter(
-        candidates.getFilesToCompact(),
-        new Predicate<StoreFile>() {
-          @Override
-          public boolean apply(StoreFile input) {
-            return input.excludeFromMinorCompaction();
-          }
-        }));
-    return candidates;
-  }
-
-  /**
-   * @param candidates pre-filtrate
-   * @return filtered subset
-   * take upto maxFilesToCompact from the start
-   */
-  private CompactSelection removeExcessFiles(CompactSelection candidates,
-      boolean isUserCompaction, boolean isMajorCompaction) {
-    int excess = candidates.getFilesToCompact().size() - comConf.getMaxFilesToCompact();
-    if (excess > 0) {
-      if (isMajorCompaction && isUserCompaction) {
-        LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() +
-            " files because of a user-requested major compaction");
-      } else {
-        LOG.debug("Too many admissible files. Excluding " + excess
-          + " files from compaction candidates");
-        candidates.clearSubList(comConf.getMaxFilesToCompact(),
-          candidates.getFilesToCompact().size());
-      }
+  public void updateConfiguration() {
+    if (getConf() != null && store != null) {
+      comConf = new CompactionConfiguration(getConf(), store);
     }
-    return candidates;
   }
+
   /**
-   * @param candidates pre-filtrate
-   * @return filtered subset
-   * forget the compactionSelection if we don't have enough files
+   * Get the compactor for this policy
+   * @return the compactor for this policy
    */
-  private CompactSelection checkMinFilesCriteria(CompactSelection candidates) {
-    int minFiles = comConf.getMinFilesToCompact();
-    if (candidates.getFilesToCompact().size() < minFiles) {
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Not compacting files because we only have " +
-            candidates.getFilesToCompact().size() +
-          " files ready for compaction.  Need " + minFiles + " to initiate.");
-      }
-      candidates.emptyFileList();
-    }
-    return candidates;
+  public Compactor getCompactor() {
+    return compactor;
   }
 
   /**
-    * @param candidates pre-filtrate
-    * @return filtered subset
-    * -- Default minor compaction selection algorithm:
-    * choose CompactSelection from candidates --
-    * First exclude bulk-load files if indicated in configuration.
-    * Start at the oldest file and stop when you find the first file that
-    * meets compaction criteria:
-    * (1) a recently-flushed, small file (i.e. <= minCompactSize)
-    * OR
-    * (2) within the compactRatio of sum(newer_files)
-    * Given normal skew, any newer files will also meet this criteria
-    * <p/>
-    * Additional Note:
-    * If fileSizes.size() >> maxFilesToCompact, we will recurse on
-    * compact().  Consider the oldest files first to avoid a
-    * situation where we always compact [end-threshold,end).  Then, the
-    * last file becomes an aggregate of the previous compactions.
-    *
-    * normal skew:
-    *
-    *         older ----> newer (increasing seqID)
-    *     _
-    *    | |   _
-    *    | |  | |   _
-    *  --|-|- |-|- |-|---_-------_-------  minCompactSize
-    *    | |  | |  | |  | |  _  | |
-    *    | |  | |  | |  | | | | | |
-    *    | |  | |  | |  | | | | | |
-    */
-  CompactSelection applyCompactionPolicy(CompactSelection candidates) throws IOException {
-    if (candidates.getFilesToCompact().isEmpty()) {
-      return candidates;
-    }
-
-    // we're doing a minor compaction, let's see what files are applicable
-    int start = 0;
-    double ratio = comConf.getCompactionRatio();
-    if (isOffPeakHour() && candidates.trySetOffpeak()) {
-      ratio = comConf.getCompactionRatioOffPeak();
-      LOG.info("Running an off-peak compaction, selection ratio = " + ratio
-          + ", numOutstandingOffPeakCompactions is now "
-          + CompactSelection.getNumOutStandingOffPeakCompactions());
-    }
-
-    // get store file sizes for incremental compacting selection.
-    int countOfFiles = candidates.getFilesToCompact().size();
-    long[] fileSizes = new long[countOfFiles];
-    long[] sumSize = new long[countOfFiles];
-    for (int i = countOfFiles - 1; i >= 0; --i) {
-      StoreFile file = candidates.getFilesToCompact().get(i);
-      fileSizes[i] = file.getReader().length();
-      // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
-      int tooFar = i + comConf.getMaxFilesToCompact() - 1;
-      sumSize[i] = fileSizes[i]
-        + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
-        - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
-    }
-
-
-    while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
-      fileSizes[start] > Math.max(comConf.getMinCompactSize(),
-          (long) (sumSize[start + 1] * ratio))) {
-      ++start;
-    }
-    if (start < countOfFiles) {
-      LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
-        + " files from " + countOfFiles + " candidates");
-    }
-
-    candidates = candidates.getSubList(start, countOfFiles);
-
-    return candidates;
-  }
-
-  /*
-   * @param filesToCompact Files to compact. Can be null.
-   * @return True if we should run a major compaction.
+   * Set the new configuration
    */
-  public boolean isMajorCompaction(final List<StoreFile> filesToCompact)
-      throws IOException {
-    boolean result = false;
-    long mcTime = getNextMajorCompactTime(filesToCompact);
-    if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
-      return result;
-    }
-    // TODO: Use better method for determining stamp of last major (HBASE-2990)
-    long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
-    long now = System.currentTimeMillis();
-    if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
-      // Major compaction time has elapsed.
-      long cfTtl = this.storeConfig.getStoreFileTtl();
-      if (filesToCompact.size() == 1) {
-        // Single file
-        StoreFile sf = filesToCompact.get(0);
-        Long minTimestamp = sf.getMinimumTimestamp();
-        long oldest = (minTimestamp == null)
-            ? Long.MIN_VALUE
-            : now - minTimestamp.longValue();
-        if (sf.isMajorCompaction() &&
-            (cfTtl == HConstants.FOREVER || oldest < cfTtl)) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Skipping major compaction of " + this +
-                " because one (major) compacted file only and oldestTime " +
-                oldest + "ms is < ttl=" + cfTtl);
-          }
-        } else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) {
-          LOG.debug("Major compaction triggered on store " + this +
-            ", because keyvalues outdated; time since last major compaction " +
-            (now - lowTimestamp) + "ms");
-          result = true;
-        }
-      } else {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Major compaction triggered on store " + this +
-              "; time since last major compaction " + (now - lowTimestamp) + "ms");
-        }
-        result = true;
-      }
-    }
-    return result;
-  }
-
-  public long getNextMajorCompactTime(final List<StoreFile> filesToCompact) {
-    // default = 24hrs
-    long ret = comConf.getMajorCompactionPeriod();
-    if (ret > 0) {
-      // default = 20% = +/- 4.8 hrs
-      double jitterPct = comConf.getMajorCompactionJitter();
-      if (jitterPct > 0) {
-        long jitter = Math.round(ret * jitterPct);
-        // deterministic jitter avoids a major compaction storm on restart
-        Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact);
-        if (seed != null) {
-          double rnd = (new Random(seed)).nextDouble();
-          ret += jitter - Math.round(2L * jitter * rnd);
-        } else {
-          ret = 0; // no storefiles == no major compaction
-        }
-      }
-    }
-    return ret;
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+    updateConfiguration();
   }
 
   /**
-   * @param compactionSize Total size of some compaction
-   * @return whether this should be a large or small compaction
+   * Upon construction, this method will be called with the HStore
+   * to be governed. It will be called once and only once.
    */
-  public boolean throttleCompaction(long compactionSize) {
-    return compactionSize > comConf.getThrottlePoint();
+  protected void configureForStore(HStore store) {
+    this.store = store;
+    updateConfiguration();
   }
 
   /**
-   * @param numCandidates Number of candidate store files
-   * @return whether a compactionSelection is possible
+   * Create the CompactionPolicy configured for the given HStore.
+   * @param store
+   * @param conf
+   * @return a CompactionPolicy
+   * @throws IOException
    */
-  public boolean needsCompaction(int numCandidates) {
-    return numCandidates > comConf.getMinFilesToCompact();
+  public static CompactionPolicy create(HStore store,
+      Configuration conf) throws IOException {
+    Class<? extends CompactionPolicy> clazz =
+      getCompactionPolicyClass(store.getFamily(), conf);
+    CompactionPolicy policy = ReflectionUtils.newInstance(clazz, conf);
+    policy.configureForStore(store);
+    return policy;
   }
 
-  /**
-   * @return whether this is off-peak hour
-   */
-  private boolean isOffPeakHour() {
-    int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
-    int startHour = comConf.getOffPeakStartHour();
-    int endHour = comConf.getOffPeakEndHour();
-    // If offpeak time checking is disabled just return false.
-    if (startHour == endHour) {
-      return false;
-    }
-    if (startHour < endHour) {
-      return (currentHour >= startHour && currentHour < endHour);
+  static Class<? extends CompactionPolicy> getCompactionPolicyClass(
+      HColumnDescriptor family, Configuration conf) throws IOException {
+    String className = conf.get(COMPACTION_POLICY_KEY,
+      DEFAULT_COMPACTION_POLICY_CLASS.getName());
+
+    try {
+      Class<? extends CompactionPolicy> clazz =
+        Class.forName(className).asSubclass(CompactionPolicy.class);
+      return clazz;
+    } catch (Exception  e) {
+      throw new IOException(
+        "Unable to load configured region compaction policy '"
+        + className + "' for column '" + family.getNameAsString()
+        + "'", e);
     }
-    return (currentHour >= startHour || currentHour < endHour);
   }
-}
\ No newline at end of file
+}

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java?rev=1440737&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java Wed Jan 30 23:53:42 2013
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+
+/**
+ * A compactor is a compaction algorithm associated a given policy.
+ */
+@InterfaceAudience.Private
+public abstract class Compactor {
+
+  CompactionProgress progress;
+  CompactionPolicy policy;
+
+  Compactor(final CompactionPolicy policy) {
+    this.policy = policy;
+  }
+
+  /**
+   * Do a minor/major compaction on an explicit set of storefiles from a Store.
+   *
+   * @param filesToCompact which files to compact
+   * @param majorCompaction true to major compact (prune all deletes, max versions, etc)
+   * @return Product of compaction or an empty list if all cells expired or deleted and
+   * nothing made it through the compaction.
+   * @throws IOException
+   */
+  public abstract List<Path> compact(final Collection<StoreFile> filesToCompact,
+    final boolean majorCompaction) throws IOException;
+
+  public Configuration getConf() {
+    return policy.getConf();
+  }
+
+  public CompactionProgress getProgress() {
+    return this.progress;
+  }
+}

Propchange: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java?rev=1440737&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java Wed Jan 30 23:53:42 2013
@@ -0,0 +1,393 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreUtils;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+
+/**
+ * The default algorithm for selecting files for compaction.
+ * Combines the compaction configuration and the provisional file selection that
+ * it's given to produce the list of suitable candidates for compaction.
+ */
+@InterfaceAudience.Private
+public class DefaultCompactionPolicy extends CompactionPolicy {
+
+  private static final Log LOG = LogFactory.getLog(DefaultCompactionPolicy.class);
+  private final static Calendar calendar = new GregorianCalendar();
+
+  public DefaultCompactionPolicy() {
+    compactor = new DefaultCompactor(this);
+  }
+
+  /**
+   * @param candidateFiles candidate files, ordered from oldest to newest
+   * @return subset copy of candidate list that meets compaction criteria
+   * @throws java.io.IOException
+   */
+  public CompactSelection selectCompaction(List<StoreFile> candidateFiles,
+      boolean isUserCompaction, boolean forceMajor)
+    throws IOException {
+    // Preliminary compaction subject to filters
+    CompactSelection candidateSelection = new CompactSelection(candidateFiles);
+    long cfTtl = this.store.getStoreFileTtl();
+    if (!forceMajor) {
+      // If there are expired files, only select them so that compaction deletes them
+      if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) {
+        CompactSelection expiredSelection = selectExpiredStoreFiles(
+          candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl);
+        if (expiredSelection != null) {
+          return expiredSelection;
+        }
+      }
+      candidateSelection = skipLargeFiles(candidateSelection);
+    }
+
+    // Force a major compaction if this is a user-requested major compaction,
+    // or if we do not have too many files to compact and this was requested
+    // as a major compaction.
+    // Or, if there are any references among the candidates.
+    boolean majorCompaction = (
+      (forceMajor && isUserCompaction)
+      || ((forceMajor || isMajorCompaction(candidateSelection.getFilesToCompact()))
+          && (candidateSelection.getFilesToCompact().size() < comConf.getMaxFilesToCompact()))
+      || StoreUtils.hasReferences(candidateSelection.getFilesToCompact())
+      );
+
+    if (!majorCompaction) {
+      // we're doing a minor compaction, let's see what files are applicable
+      candidateSelection = filterBulk(candidateSelection);
+      candidateSelection = applyCompactionPolicy(candidateSelection);
+      candidateSelection = checkMinFilesCriteria(candidateSelection);
+    }
+    candidateSelection =
+        removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction);
+    return candidateSelection;
+  }
+
+  /**
+   * Select the expired store files to compact
+   *
+   * @param candidates the initial set of storeFiles
+   * @param maxExpiredTimeStamp
+   *          The store file will be marked as expired if its max time stamp is
+   *          less than this maxExpiredTimeStamp.
+   * @return A CompactSelection contains the expired store files as
+   *         filesToCompact
+   */
+  private CompactSelection selectExpiredStoreFiles(
+      CompactSelection candidates, long maxExpiredTimeStamp) {
+    List<StoreFile> filesToCompact = candidates.getFilesToCompact();
+    if (filesToCompact == null || filesToCompact.size() == 0)
+      return null;
+    ArrayList<StoreFile> expiredStoreFiles = null;
+    boolean hasExpiredStoreFiles = false;
+    CompactSelection expiredSFSelection = null;
+
+    for (StoreFile storeFile : filesToCompact) {
+      if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) {
+        LOG.info("Deleting the expired store file by compaction: "
+            + storeFile.getPath() + " whose maxTimeStamp is "
+            + storeFile.getReader().getMaxTimestamp()
+            + " while the max expired timestamp is " + maxExpiredTimeStamp);
+        if (!hasExpiredStoreFiles) {
+          expiredStoreFiles = new ArrayList<StoreFile>();
+          hasExpiredStoreFiles = true;
+        }
+        expiredStoreFiles.add(storeFile);
+      }
+    }
+
+    if (hasExpiredStoreFiles) {
+      expiredSFSelection = new CompactSelection(expiredStoreFiles);
+    }
+    return expiredSFSelection;
+  }
+
+  /**
+   * @param candidates pre-filtrate
+   * @return filtered subset
+   * exclude all files above maxCompactSize
+   * Also save all references. We MUST compact them
+   */
+  private CompactSelection skipLargeFiles(CompactSelection candidates) {
+    int pos = 0;
+    while (pos < candidates.getFilesToCompact().size() &&
+      candidates.getFilesToCompact().get(pos).getReader().length() >
+        comConf.getMaxCompactSize() &&
+      !candidates.getFilesToCompact().get(pos).isReference()) {
+      ++pos;
+    }
+    if (pos > 0) {
+      LOG.debug("Some files are too large. Excluding " + pos
+          + " files from compaction candidates");
+      candidates.clearSubList(0, pos);
+    }
+    return candidates;
+  }
+
+  /**
+   * @param candidates pre-filtrate
+   * @return filtered subset
+   * exclude all bulk load files if configured
+   */
+  private CompactSelection filterBulk(CompactSelection candidates) {
+    candidates.getFilesToCompact().removeAll(Collections2.filter(
+        candidates.getFilesToCompact(),
+        new Predicate<StoreFile>() {
+          @Override
+          public boolean apply(StoreFile input) {
+            return input.excludeFromMinorCompaction();
+          }
+        }));
+    return candidates;
+  }
+
+  /**
+   * @param candidates pre-filtrate
+   * @return filtered subset
+   * take upto maxFilesToCompact from the start
+   */
+  private CompactSelection removeExcessFiles(CompactSelection candidates,
+      boolean isUserCompaction, boolean isMajorCompaction) {
+    int excess = candidates.getFilesToCompact().size() - comConf.getMaxFilesToCompact();
+    if (excess > 0) {
+      if (isMajorCompaction && isUserCompaction) {
+        LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() +
+            " files because of a user-requested major compaction");
+      } else {
+        LOG.debug("Too many admissible files. Excluding " + excess
+          + " files from compaction candidates");
+        candidates.clearSubList(comConf.getMaxFilesToCompact(),
+          candidates.getFilesToCompact().size());
+      }
+    }
+    return candidates;
+  }
+  /**
+   * @param candidates pre-filtrate
+   * @return filtered subset
+   * forget the compactionSelection if we don't have enough files
+   */
+  private CompactSelection checkMinFilesCriteria(CompactSelection candidates) {
+    int minFiles = comConf.getMinFilesToCompact();
+    if (candidates.getFilesToCompact().size() < minFiles) {
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Not compacting files because we only have " +
+            candidates.getFilesToCompact().size() +
+          " files ready for compaction.  Need " + minFiles + " to initiate.");
+      }
+      candidates.emptyFileList();
+    }
+    return candidates;
+  }
+
+  /**
+    * @param candidates pre-filtrate
+    * @return filtered subset
+    * -- Default minor compaction selection algorithm:
+    * choose CompactSelection from candidates --
+    * First exclude bulk-load files if indicated in configuration.
+    * Start at the oldest file and stop when you find the first file that
+    * meets compaction criteria:
+    * (1) a recently-flushed, small file (i.e. <= minCompactSize)
+    * OR
+    * (2) within the compactRatio of sum(newer_files)
+    * Given normal skew, any newer files will also meet this criteria
+    * <p/>
+    * Additional Note:
+    * If fileSizes.size() >> maxFilesToCompact, we will recurse on
+    * compact().  Consider the oldest files first to avoid a
+    * situation where we always compact [end-threshold,end).  Then, the
+    * last file becomes an aggregate of the previous compactions.
+    *
+    * normal skew:
+    *
+    *         older ----> newer (increasing seqID)
+    *     _
+    *    | |   _
+    *    | |  | |   _
+    *  --|-|- |-|- |-|---_-------_-------  minCompactSize
+    *    | |  | |  | |  | |  _  | |
+    *    | |  | |  | |  | | | | | |
+    *    | |  | |  | |  | | | | | |
+    */
+  CompactSelection applyCompactionPolicy(CompactSelection candidates) throws IOException {
+    if (candidates.getFilesToCompact().isEmpty()) {
+      return candidates;
+    }
+
+    // we're doing a minor compaction, let's see what files are applicable
+    int start = 0;
+    double ratio = comConf.getCompactionRatio();
+    if (isOffPeakHour() && candidates.trySetOffpeak()) {
+      ratio = comConf.getCompactionRatioOffPeak();
+      LOG.info("Running an off-peak compaction, selection ratio = " + ratio
+          + ", numOutstandingOffPeakCompactions is now "
+          + CompactSelection.getNumOutStandingOffPeakCompactions());
+    }
+
+    // get store file sizes for incremental compacting selection.
+    int countOfFiles = candidates.getFilesToCompact().size();
+    long[] fileSizes = new long[countOfFiles];
+    long[] sumSize = new long[countOfFiles];
+    for (int i = countOfFiles - 1; i >= 0; --i) {
+      StoreFile file = candidates.getFilesToCompact().get(i);
+      fileSizes[i] = file.getReader().length();
+      // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
+      int tooFar = i + comConf.getMaxFilesToCompact() - 1;
+      sumSize[i] = fileSizes[i]
+        + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
+        - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
+    }
+
+
+    while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
+      fileSizes[start] > Math.max(comConf.getMinCompactSize(),
+          (long) (sumSize[start + 1] * ratio))) {
+      ++start;
+    }
+    if (start < countOfFiles) {
+      LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
+        + " files from " + countOfFiles + " candidates");
+    }
+
+    candidates = candidates.getSubList(start, countOfFiles);
+
+    return candidates;
+  }
+
+  /*
+   * @param filesToCompact Files to compact. Can be null.
+   * @return True if we should run a major compaction.
+   */
+  public boolean isMajorCompaction(final List<StoreFile> filesToCompact)
+      throws IOException {
+    boolean result = false;
+    long mcTime = getNextMajorCompactTime(filesToCompact);
+    if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
+      return result;
+    }
+    // TODO: Use better method for determining stamp of last major (HBASE-2990)
+    long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
+    long now = System.currentTimeMillis();
+    if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
+      // Major compaction time has elapsed.
+      long cfTtl = this.store.getStoreFileTtl();
+      if (filesToCompact.size() == 1) {
+        // Single file
+        StoreFile sf = filesToCompact.get(0);
+        Long minTimestamp = sf.getMinimumTimestamp();
+        long oldest = (minTimestamp == null)
+            ? Long.MIN_VALUE
+            : now - minTimestamp.longValue();
+        if (sf.isMajorCompaction() &&
+            (cfTtl == HConstants.FOREVER || oldest < cfTtl)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Skipping major compaction of " + this +
+                " because one (major) compacted file only and oldestTime " +
+                oldest + "ms is < ttl=" + cfTtl);
+          }
+        } else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) {
+          LOG.debug("Major compaction triggered on store " + this +
+            ", because keyvalues outdated; time since last major compaction " +
+            (now - lowTimestamp) + "ms");
+          result = true;
+        }
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Major compaction triggered on store " + this +
+              "; time since last major compaction " + (now - lowTimestamp) + "ms");
+        }
+        result = true;
+      }
+    }
+    return result;
+  }
+
+  public long getNextMajorCompactTime(final List<StoreFile> filesToCompact) {
+    // default = 24hrs
+    long ret = comConf.getMajorCompactionPeriod();
+    if (ret > 0) {
+      // default = 20% = +/- 4.8 hrs
+      double jitterPct = comConf.getMajorCompactionJitter();
+      if (jitterPct > 0) {
+        long jitter = Math.round(ret * jitterPct);
+        // deterministic jitter avoids a major compaction storm on restart
+        Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact);
+        if (seed != null) {
+          double rnd = (new Random(seed)).nextDouble();
+          ret += jitter - Math.round(2L * jitter * rnd);
+        } else {
+          ret = 0; // no storefiles == no major compaction
+        }
+      }
+    }
+    return ret;
+  }
+
+  /**
+   * @param compactionSize Total size of some compaction
+   * @return whether this should be a large or small compaction
+   */
+  public boolean throttleCompaction(long compactionSize) {
+    return compactionSize > comConf.getThrottlePoint();
+  }
+
+  /**
+   * @param numCandidates Number of candidate store files
+   * @return whether a compactionSelection is possible
+   */
+  public boolean needsCompaction(int numCandidates) {
+    return numCandidates > comConf.getMinFilesToCompact();
+  }
+
+  /**
+   * @return whether this is off-peak hour
+   */
+  private boolean isOffPeakHour() {
+    int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
+    int startHour = comConf.getOffPeakStartHour();
+    int endHour = comConf.getOffPeakEndHour();
+    // If offpeak time checking is disabled just return false.
+    if (startHour == endHour) {
+      return false;
+    }
+    if (startHour < endHour) {
+      return (currentHour >= startHour && currentHour < endHour);
+    }
+    return (currentHour >= startHour || currentHour < endHour);
+  }
+}
\ No newline at end of file

Propchange: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java?rev=1440737&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java Wed Jan 30 23:53:42 2013
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Compact passed set of files.
+ * Create an instance and then call {@ink #compact(Collection, boolean, long)}.
+ */
+@InterfaceAudience.Private
+class DefaultCompactor extends Compactor {
+  private static final Log LOG = LogFactory.getLog(DefaultCompactor.class);
+
+  DefaultCompactor(final CompactionPolicy policy) {
+    super(policy);
+  }
+
+  /**
+   * Do a minor/major compaction on an explicit set of storefiles from a Store.
+   *
+   * @param filesToCompact which files to compact
+   * @param majorCompaction true to major compact (prune all deletes, max versions, etc)
+   * @return Product of compaction or an empty list if all cells expired or deleted and
+   * nothing made it through the compaction.
+   * @throws IOException
+   */
+  @SuppressWarnings("deprecation")
+  public List<Path> compact(final Collection<StoreFile> filesToCompact,
+      final boolean majorCompaction) throws IOException {
+    // Max-sequenceID is the last key in the files we're compacting
+    long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact, true);
+
+    // Calculate maximum key count after compaction (for blooms)
+    // Also calculate earliest put timestamp if major compaction
+    int maxKeyCount = 0;
+    HStore store = policy.store;
+    long earliestPutTs = HConstants.LATEST_TIMESTAMP;
+    for (StoreFile file: filesToCompact) {
+      StoreFile.Reader r = file.getReader();
+      if (r == null) {
+        LOG.warn("Null reader for " + file.getPath());
+        continue;
+      }
+      // NOTE: getFilterEntries could cause under-sized blooms if the user
+      // switches bloom type (e.g. from ROW to ROWCOL)
+      long keyCount = (r.getBloomFilterType() == store.getFamily().getBloomFilterType())?
+        r.getFilterEntries() : r.getEntries();
+      maxKeyCount += keyCount;
+      // For major compactions calculate the earliest put timestamp of all
+      // involved storefiles. This is used to remove family delete marker during
+      // compaction.
+      if (majorCompaction) {
+        byte [] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS);
+        if (tmp == null) {
+          // There's a file with no information, must be an old one
+          // assume we have very old puts
+          earliestPutTs = HConstants.OLDEST_TIMESTAMP;
+        } else {
+          earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp));
+        }
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Compacting " + file +
+          ", keycount=" + keyCount +
+          ", bloomtype=" + r.getBloomFilterType().toString() +
+          ", size=" + StringUtils.humanReadableInt(r.length()) +
+          ", encoding=" + r.getHFileReader().getEncodingOnDisk() +
+          (majorCompaction? ", earliestPutTs=" + earliestPutTs: ""));
+      }
+    }
+
+    // keep track of compaction progress
+    this.progress = new CompactionProgress(maxKeyCount);
+
+    // For each file, obtain a scanner:
+    List<StoreFileScanner> scanners = StoreFileScanner
+      .getScannersForStoreFiles(filesToCompact, false, false, true);
+
+    // Get some configs
+    int compactionKVMax = getConf().getInt(HConstants.COMPACTION_KV_MAX, 10);
+    Compression.Algorithm compression = store.getFamily().getCompression();
+    // Avoid overriding compression setting for major compactions if the user
+    // has not specified it separately
+    Compression.Algorithm compactionCompression =
+      (store.getFamily().getCompactionCompression() != Compression.Algorithm.NONE) ?
+      store.getFamily().getCompactionCompression(): compression;
+    // Make the instantiation lazy in case compaction produces no product; i.e.
+    // where all source cells are expired or deleted.
+    StoreFile.Writer writer = null;
+    List<Path> newFiles = new ArrayList<Path>();
+    // Find the smallest read point across all the Scanners.
+    long smallestReadPoint = store.getHRegion().getSmallestReadPoint();
+    MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint);
+    try {
+      InternalScanner scanner = null;
+      try {
+        if (store.getHRegion().getCoprocessorHost() != null) {
+          scanner = store
+              .getHRegion()
+              .getCoprocessorHost()
+              .preCompactScannerOpen(store, scanners,
+                  majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, earliestPutTs);
+        }
+        ScanType scanType = majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT;
+        if (scanner == null) {
+          Scan scan = new Scan();
+          scan.setMaxVersions(store.getFamily().getMaxVersions());
+          /* Include deletes, unless we are doing a major compaction */
+          scanner = new StoreScanner(store, store.getScanInfo(), scan, scanners,
+            scanType, smallestReadPoint, earliestPutTs);
+        }
+        if (store.getHRegion().getCoprocessorHost() != null) {
+          InternalScanner cpScanner =
+            store.getHRegion().getCoprocessorHost().preCompact(store, scanner, scanType);
+          // NULL scanner returned from coprocessor hooks means skip normal processing
+          if (cpScanner == null) {
+            return newFiles;  // an empty list
+          }
+          scanner = cpScanner;
+        }
+
+        int bytesWritten = 0;
+        // Since scanner.next() can return 'false' but still be delivering data,
+        // we have to use a do/while loop.
+        List<KeyValue> kvs = new ArrayList<KeyValue>();
+        // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
+        int closeCheckInterval = HStore.getCloseCheckInterval();
+        boolean hasMore;
+        do {
+          hasMore = scanner.next(kvs, compactionKVMax);
+          // Create the writer even if no kv(Empty store file is also ok),
+          // because we need record the max seq id for the store file, see
+          // HBASE-6059
+          if (writer == null) {
+            writer = store.createWriterInTmp(maxKeyCount, compactionCompression, true);
+          }
+          if (writer != null) {
+            // output to writer:
+            for (KeyValue kv : kvs) {
+              if (kv.getMemstoreTS() <= smallestReadPoint) {
+                kv.setMemstoreTS(0);
+              }
+              writer.append(kv);
+              // update progress per key
+              ++progress.currentCompactedKVs;
+
+              // check periodically to see if a system stop is requested
+              if (closeCheckInterval > 0) {
+                bytesWritten += kv.getLength();
+                if (bytesWritten > closeCheckInterval) {
+                  bytesWritten = 0;
+                  isInterrupted(store, writer);
+                }
+              }
+            }
+          }
+          kvs.clear();
+        } while (hasMore);
+      } finally {
+        if (scanner != null) {
+          scanner.close();
+        }
+      }
+    } finally {
+      if (writer != null) {
+        writer.appendMetadata(maxId, majorCompaction);
+        writer.close();
+        newFiles.add(writer.getPath());
+      }
+    }
+    return newFiles;
+  }
+
+  void isInterrupted(final HStore store, final StoreFile.Writer writer)
+  throws IOException {
+    if (store.getHRegion().areWritesEnabled()) return;
+    // Else cleanup.
+    writer.close();
+    store.getFileSystem().delete(writer.getPath(), false);
+    throw new InterruptedIOException( "Aborting compaction of store " + store +
+      " in region " + store.getHRegion() + " because it was interrupted.");
+  }
+}

Propchange: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java?rev=1440737&r1=1440736&r2=1440737&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/HFileReadWriteTest.java Wed Jan 30 23:53:42 2013
@@ -407,7 +407,7 @@ public class HFileReadWriteTest {
       Scan scan = new Scan();
 
       // Include deletes
-      scanner = new StoreScanner(store, store.scanInfo, scan, scanners,
+      scanner = new StoreScanner(store, store.getScanInfo(), scan, scanners,
           ScanType.MAJOR_COMPACT, Long.MIN_VALUE, Long.MIN_VALUE);
 
       ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1440737&r1=1440736&r2=1440737&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java Wed Jan 30 23:53:42 2013
@@ -280,11 +280,11 @@ public class TestCompaction extends HBas
     final int ttl = 1000;
     for (Store hstore : this.r.stores.values()) {
       HStore store = ((HStore) hstore);
-      HStore.ScanInfo old = store.scanInfo;
+      HStore.ScanInfo old = store.getScanInfo();
       HStore.ScanInfo si = new HStore.ScanInfo(old.getFamily(),
           old.getMinVersions(), old.getMaxVersions(), ttl,
           old.getKeepDeletedCells(), 0, old.getComparator());
-      store.scanInfo = si;
+      store.setScanInfo(si);
     }
     Thread.sleep(1000);
 
@@ -301,7 +301,7 @@ public class TestCompaction extends HBas
     conf.setFloat("hbase.hregion.majorcompaction.jitter", jitterPct);
 
     HStore s = ((HStore) r.getStore(COLUMN_FAMILY));
-    s.compactionPolicy.updateConfiguration(conf, s);
+    s.compactionPolicy.setConf(conf);
     try {
       createStoreFile(r);
       createStoreFile(r);
@@ -313,7 +313,7 @@ public class TestCompaction extends HBas
       assertEquals(2, s.getStorefilesCount());
 
       // ensure that major compaction time is deterministic
-      CompactionPolicy c = s.compactionPolicy;
+      DefaultCompactionPolicy c = (DefaultCompactionPolicy)s.compactionPolicy;
       List<StoreFile> storeFiles = s.getStorefiles();
       long mcTime = c.getNextMajorCompactTime(storeFiles);
       for (int i = 0; i < 10; ++i) {
@@ -539,11 +539,11 @@ public class TestCompaction extends HBas
       final int ttl = 1000;
       for (Store hstore: this.r.stores.values()) {
         HStore store = (HStore)hstore;
-        HStore.ScanInfo old = store.scanInfo;
+        HStore.ScanInfo old = store.getScanInfo();
         HStore.ScanInfo si = new HStore.ScanInfo(old.getFamily(),
             old.getMinVersions(), old.getMaxVersions(), ttl,
             old.getKeepDeletedCells(), 0, old.getComparator());
-        store.scanInfo = si;
+        store.setScanInfo(si);
       }
       Thread.sleep(ttl);
 
@@ -588,15 +588,15 @@ public class TestCompaction extends HBas
     HStore store = (HStore) r.getStore(COLUMN_FAMILY);
 
     List<StoreFile> storeFiles = store.getStorefiles();
-    long maxId = StoreFile.getMaxSequenceIdInList(storeFiles, true);
-    Compactor tool = new Compactor(this.conf);
+    Compactor tool = store.compactionPolicy.getCompactor();
 
-    StoreFile.Writer compactedFile =
-      tool.compact(store, storeFiles, false, maxId);
+    List<Path> newFiles =
+      tool.compact(storeFiles, false);
 
     // Now lets corrupt the compacted file.
     FileSystem fs = FileSystem.get(conf);
-    Path origPath = compactedFile.getPath();
+    // default compaction policy created one and only one new compacted file
+    Path origPath = newFiles.get(0);
     Path homedir = store.getHomedir();
     Path dstPath = new Path(homedir, origPath.getName());
     FSDataOutputStream stream = fs.create(origPath, null, true, 512, (short) 3,
@@ -606,7 +606,7 @@ public class TestCompaction extends HBas
     stream.close();
 
     try {
-      store.completeCompaction(storeFiles, compactedFile);
+      store.completeCompaction(storeFiles, origPath);
     } catch (Exception e) {
       // The complete compaction should fail and the corrupt file should remain
       // in the 'tmp' directory;

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java?rev=1440737&r1=1440736&r2=1440737&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java Wed Jan 30 23:53:42 2013
@@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.HTableDes
 import org.apache.hadoop.hbase.SmallTests;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -58,7 +57,6 @@ public class TestDefaultCompactSelection
   private static final String DIR=
     TEST_UTIL.getDataTestDir(TestDefaultCompactSelection.class.getSimpleName()).toString();
   private static Path TEST_FILE;
-  private CompactionPolicy manager;
 
   protected static final int minFiles = 3;
   protected static final int maxFiles = 5;
@@ -84,7 +82,6 @@ public class TestDefaultCompactSelection
     Path basedir = new Path(DIR);
     String logName = "logs";
     Path logdir = new Path(DIR, logName);
-    Path oldLogDir = new Path(basedir, HConstants.HREGION_OLDLOGDIR_NAME);
     HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes("family"));
     FileSystem fs = FileSystem.get(conf);
 
@@ -102,7 +99,6 @@ public class TestDefaultCompactSelection
     region = new HRegion(tableDir, hlog, fs, conf, info, htd, null);
 
     store = new HStore(basedir, region, hcd, fs, conf);
-    manager = store.compactionPolicy;
 
     TEST_FILE = StoreFile.getRandomFilename(fs, store.getHomedir());
     fs.create(TEST_FILE);
@@ -282,7 +278,7 @@ public class TestDefaultCompactSelection
     compactEquals(sfCreate(100,50,23,12,12), true, 23, 12, 12);
     conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 1);
     conf.setFloat("hbase.hregion.majorcompaction.jitter", 0);
-    store.compactionPolicy.updateConfiguration(conf, store);
+    store.compactionPolicy.updateConfiguration();
     try {
       // trigger an aged major compaction
       compactEquals(sfCreate(50,25,12,12), 50, 25, 12, 12);
@@ -313,7 +309,7 @@ public class TestDefaultCompactSelection
      * current compaction algorithm.  Developed to ensure that refactoring
      * doesn't implicitly alter this.
      */
-    long tooBig = maxSize + 1;
+    //long tooBig = maxSize + 1;
 
     Calendar calendar = new GregorianCalendar();
     int hourOfDay = calendar.get(Calendar.HOUR_OF_DAY);
@@ -334,13 +330,13 @@ public class TestDefaultCompactSelection
     this.conf.setLong("hbase.offpeak.end.hour", hourPlusOne);
     LOG.debug("Testing compact selection with off-peak settings (" +
         hourMinusOne + ", " + hourPlusOne + ")");
-    store.compactionPolicy.updateConfiguration(this.conf, store);
+    store.compactionPolicy.updateConfiguration();
     compactEquals(sfCreate(999, 50, 12, 12, 1), 50, 12, 12, 1);
 
     // set peak hour outside current selection and check compact selection
     this.conf.setLong("hbase.offpeak.start.hour", hourMinusTwo);
     this.conf.setLong("hbase.offpeak.end.hour", hourMinusOne);
-    store.compactionPolicy.updateConfiguration(this.conf, store);
+    store.compactionPolicy.updateConfiguration();
     LOG.debug("Testing compact selection with off-peak settings (" +
         hourMinusTwo + ", " + hourMinusOne + ")");
     compactEquals(sfCreate(999,50,12,12, 1), 12, 12, 1);

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=1440737&r1=1440736&r2=1440737&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java Wed Jan 30 23:53:42 2013
@@ -45,7 +45,6 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
@@ -203,7 +202,7 @@ public class TestStore extends TestCase 
     hcd.setTimeToLive(ttl);
     init(getName(), conf, hcd);
 
-    long sleepTime = this.store.scanInfo.getTtl() / storeFileNum;
+    long sleepTime = this.store.getScanInfo().getTtl() / storeFileNum;
     long timeStamp;
     // There are 4 store files and the max time stamp difference among these
     // store files will be (this.store.ttl / storeFileNum)
@@ -229,11 +228,12 @@ public class TestStore extends TestCase 
       // If not the first compaction, there is another empty store file,
       assertEquals(Math.min(i, 2), cr.getFiles().size());
       for (int j = 0; i < cr.getFiles().size(); j++) {
-        assertTrue(cr.getFiles().get(j).getReader().getMaxTimestamp() <
-            (EnvironmentEdgeManager.currentTimeMillis() - this.store.scanInfo.getTtl()));
+        assertTrue(cr.getFiles().get(j).getReader().getMaxTimestamp() < (System
+            .currentTimeMillis() - this.store.getScanInfo().getTtl()));
       }
       // Verify that the expired store file is compacted to an empty store file.
-      StoreFile compactedFile = this.store.compact(cr);
+      // Default compaction policy creates just one and only one compacted file.
+      StoreFile compactedFile = this.store.compact(cr).get(0);
       // It is an empty store file.
       assertEquals(0, compactedFile.getReader().getEntries());
 



Mime
View raw message