hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbau...@apache.org
Subject svn commit: r1378348 [1/2] - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/ipc/ main/java/org/apache/hadoop/hbase/mapreduce/ main/java/org/apache/hadoop/hb...
Date Tue, 28 Aug 2012 21:13:39 GMT
Author: mbautin
Date: Tue Aug 28 21:13:38 2012
New Revision: 1378348

URL: http://svn.apache.org/viewvc?rev=1378348&view=rev
Log:
[HBASE-6371] Level Based Compaction

Author: liyintang

Summary: We've already discussed all features here. We'll fill in a detailed summary later.

Test Plan: New tests added for size based levelAssignment. Also tests for Recent first level traversal and vice versa.

Reviewers: liyintang, kannan

Reviewed By: liyintang

CC: hbase-eng@, mbautin

Differential Revision: https://phabricator.fb.com/D548547

Added:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSelection.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionConfiguration.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequest.java
      - copied, changed from r1378342, hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/TierCompactionConfiguration.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/TierCompactionManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/UpdateConfigTool.java
    hbase/branches/0.89-fb/src/main/resources/hbase-compactions.xml
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
      - copied, changed from r1378342, hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestTierCompactSelection.java
Removed:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java?rev=1378348&r1=1378347&r2=1378348&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java Tue Aug 28 21:13:38 2012
@@ -64,6 +64,7 @@ public class HBaseConfiguration extends 
     conf.addResource("hbase-default.xml");
     conf.addResource("hbase-site.xml");
     conf.addResource("hbase-site-custom.xml");
+    conf.addResource("hbase-compactions.xml");
     return conf;
   }
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1378348&r1=1378347&r2=1378348&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Tue Aug 28 21:13:38 2012
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.regionserver.CompactionManager;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -191,6 +192,12 @@ public final class HConstants {
   /** Default region server interface class name. */
   public static final String DEFAULT_REGION_SERVER_CLASS = HRegionInterface.class.getName();
 
+  /** Parameter name for what compaction manager to use. */
+  public static final String COMPACTION_MANAGER_CLASS = "hbase.compactionmanager.class";
+
+  /** Default compaction manager class name. */
+  public static final String DEFAULT_COMPACTION_MANAGER_CLASS = CompactionManager.class.getName();
+
   /** Parameter name for what master implementation to use. */
   public static final String MASTER_IMPL = "hbase.master.impl";
 
@@ -252,6 +259,9 @@ public final class HConstants {
   /** Default maximum file size */
   public static final long DEFAULT_MAX_FILE_SIZE = 256 * 1024 * 1024;
 
+  /** Default value for files without minFlushTime in metadata */
+  public static final long NO_MIN_FLUSH_TIME = -1;
+
   /** Conf key for the memstore size at which we flush the memstore */
   public static final String HREGION_MEMSTORE_FLUSH_SIZE =
       "hbase.hregion.memstore.flush.size";

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java?rev=1378348&r1=1378347&r2=1378348&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java Tue Aug 28 21:13:38 2012
@@ -24,6 +24,7 @@ import java.io.InterruptedIOException;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -41,6 +42,7 @@ import org.apache.hadoop.hbase.HConstant
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HServerInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.RegionException;
@@ -1285,4 +1287,23 @@ public class HBaseAdmin {
       connection.close();
     }
   }
-}
+
+  // Update configuration for all region servers
+  public void updateConfiguration() throws IOException {
+    Collection<HServerInfo> allRegionServers = this.getClusterStatus().getServerInfo();
+    for (HServerInfo serverInfo : allRegionServers) {
+      updateConfiguration(serverInfo.getServerAddress());
+    }
+  }
+
+  // Update configuration for region server at this address
+  public void updateConfiguration(String hostNameWithPort) throws IOException {
+    updateConfiguration(new HServerAddress(hostNameWithPort));
+  }
+
+  private void updateConfiguration(HServerAddress address) throws IOException {
+    HRegionInterface server = connection.getHRegionConnection(address);
+    server.updateConfiguration();
+  }
+
+}
\ No newline at end of file

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1378348&r1=1378347&r2=1378348&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Tue Aug 28 21:13:38 2012
@@ -390,5 +390,9 @@ public interface HRegionInterface extend
   public int updateFavoredNodes(AssignmentPlan plan)
   throws IOException;
 
+  /**
+   * Update the configuration.
+   */
+  public void updateConfiguration()	throws IOException;
 
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java?rev=1378348&r1=1378347&r2=1378348&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat.java Tue Aug 28 21:13:38 2012
@@ -201,10 +201,10 @@ public class HFileOutputFormat extends F
 
           /* Set maxSequenceId to be 0 for bulk imported files since
            * these files do not correspond to any edit log items.
-           *
            * Set majorCompaction flag to be false for bulk import file.
+           * For now, bulk load files don't have minFlushTime.
            */
-          w.appendMetadata(0, false);
+          w.appendMetadata(HConstants.NO_MIN_FLUSH_TIME, 0, false);
           w.close();
         }
       }

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSelection.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSelection.java?rev=1378348&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSelection.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSelection.java Tue Aug 28 21:13:38 2012
@@ -0,0 +1,133 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+public class CompactSelection {
+
+  private static final long serialVersionUID = 1L;
+
+  /** an unique ID for each compaction for log */
+  private static long counter = 0;
+  private final long compactSelectionID;
+  private final long selectionTime;
+
+  static final Log LOG = LogFactory.getLog(CompactSelection.class);
+  // the actual list - this is needed to handle methods like "sublist" 
+  // correctly
+  List<StoreFile> filesToCompact = new ArrayList<StoreFile>();
+  // number of off peak compactions either in the compaction queue or  
+  // happening now
+  public static Integer numOutstandingOffPeakCompactions = 0;
+  // was this compaction promoted to an off-peak
+  boolean isOffPeakCompaction = false;
+  // Remember the set of expired storeFiles
+
+
+  public CompactSelection(List<StoreFile> filesToCompact) {
+    this.filesToCompact = filesToCompact;
+    this.compactSelectionID = counter;
+    this.selectionTime = EnvironmentEdgeManager.currentTimeMillis();
+    counter++;
+  }
+
+  long getCompactSelectionID() {
+    return compactSelectionID;
+  }
+
+
+  long getSelectionTime() {
+    return selectionTime;
+  }
+
+  /**
+   * The current compaction finished, so reset the off peak compactions count 
+   * if this was an off peak compaction.
+   */
+  public void finishRequest() {
+    if (isOffPeakCompaction) {
+      synchronized(numOutstandingOffPeakCompactions) {
+        numOutstandingOffPeakCompactions--;
+        isOffPeakCompaction = false;
+      }
+      LOG.info("Compaction done, numOutstandingOffPeakCompactions is now " + 
+          numOutstandingOffPeakCompactions);
+    }
+  }
+  
+  public List<StoreFile> getFilesToCompact() {
+    return filesToCompact;
+  }
+  
+  /**
+   * Removes all files from the current compaction list, and resets off peak 
+   * compactions is set.
+   */
+  public void emptyFileList() {
+    filesToCompact.clear();
+    if (isOffPeakCompaction) {
+      synchronized(numOutstandingOffPeakCompactions) {
+        // reset the off peak count
+        numOutstandingOffPeakCompactions--;
+        isOffPeakCompaction = false;
+      }
+      LOG.info("Nothing to compact, numOutstandingOffPeakCompactions is now " + 
+          numOutstandingOffPeakCompactions);
+    }
+  }
+  
+  public boolean isOffPeakCompaction() {
+    return this.isOffPeakCompaction;
+  }
+
+  void setOffPeak() {
+    synchronized (numOutstandingOffPeakCompactions) {
+      numOutstandingOffPeakCompactions++;
+      isOffPeakCompaction = true;
+    }
+  }
+
+ static int getNumOutStandingOffPeakCompactions() {
+   synchronized(numOutstandingOffPeakCompactions) {
+    return numOutstandingOffPeakCompactions;
+   }
+ }
+  
+  public CompactSelection subList(int start, int end) {
+    throw new UnsupportedOperationException();
+  }
+  
+  public CompactSelection getSubList(int start, int end) {
+    filesToCompact = filesToCompact.subList(start, end);
+    return this;
+  }
+
+  public void clearSubList(int start, int end) {
+    filesToCompact.subList(start, end).clear();
+  }
+
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=1378348&r1=1378347&r2=1378348&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java Tue Aug 28 21:13:38 2012
@@ -28,7 +28,6 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 
 import com.google.common.base.Preconditions;
 

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionConfiguration.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionConfiguration.java?rev=1378348&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionConfiguration.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionConfiguration.java Tue Aug 28 21:13:38 2012
@@ -0,0 +1,182 @@
+/**
+ * Copyright 2012 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+
+/**
+ * Control knobs for default compaction algorithm :
+ * <p/>
+ * maxCompactSize - upper bound on file size to be included in minor compactions
+ * minCompactSize - lower bound below which compaction is selected without ratio test
+ * shouldExcludeBulk - whether to exclude bulk import files from minor compactions
+ * minFilesToCompact - lower bound on number of files in any minor compaction
+ * maxFilesToCompact - upper bound on number of files in any minor compaction
+ * compactionRatio - Ratio used for compaction
+ * <p/>
+ * Set parameter as "hbase.hstore.compaction.<attribute>"
+ */
+
+//TODO: revisit this class for online parameter updating
+
+public class CompactionConfiguration {
+
+  static final Log LOG = LogFactory.getLog(CompactionManager.class);
+
+  Configuration conf;
+  Store store;
+
+  long maxCompactSize;
+  long minCompactSize;
+  boolean shouldExcludeBulk;
+  int minFilesToCompact;
+  int maxFilesToCompact;
+  double compactionRatio;
+  double offPeekCompactionRatio;
+  int offPeakStartHour;
+  int offPeakEndHour;
+  long throttlePoint;
+  boolean shouldDeleteExpired;
+  long majorCompactionPeriod;
+  float majorCompactionJitter;
+
+  CompactionConfiguration(Configuration conf, Store store) {
+    this.conf = conf;
+    this.store = store;
+
+    String strPrefix = "hbase.hstore.compaction.";
+
+    maxCompactSize = conf.getLong(strPrefix + "max.size", Long.MAX_VALUE);
+    minCompactSize = conf.getLong(strPrefix + "min.size", store.getHRegion().memstoreFlushSize);
+    shouldExcludeBulk = conf.getBoolean(strPrefix + "exclude.bulk", false);
+    minFilesToCompact = Math.max(2, conf.getInt(strPrefix + "min",
+          /*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));
+    maxFilesToCompact = conf.getInt(strPrefix + "max", 10);
+    compactionRatio = conf.getFloat(strPrefix + "ratio", 1.2F);
+    offPeekCompactionRatio = conf.getFloat(strPrefix + "ratio.offpeak", 5.0F);
+
+    offPeakStartHour = conf.getInt("hbase.offpeak.start.hour", -1);
+    offPeakEndHour = conf.getInt("hbase.offpeak.end.hour", -1);
+
+    throttlePoint =  conf.getLong("hbase.regionserver.thread.compaction.throttle",
+          2 * maxFilesToCompact * store.getHRegion().memstoreFlushSize);
+    shouldDeleteExpired = conf.getBoolean("hbase.store.delete.expired.storefile", true);
+    majorCompactionPeriod = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
+    majorCompactionJitter = conf.getFloat("hbase.hregion.majorcompaction.jitter", 0.20F);
+  }
+
+  /**
+   * @return lower bound below which compaction is selected without ratio test
+   */
+  long getMinCompactSize() {
+    return minCompactSize;
+  }
+
+  /**
+   * @return upper bound on file size to be included in minor compactions
+   */
+  long getMaxCompactSize() {
+    return maxCompactSize;
+  }
+
+  /**
+   * @return upper bound on number of files to be included in minor compactions
+   */
+  int getMinFilesToCompact() {
+    return minFilesToCompact;
+  }
+
+  /**
+   * @return upper bound on number of files to be included in minor compactions
+   */
+  int getMaxFilesToCompact() {
+    return maxFilesToCompact;
+  }
+
+  /**
+   * @return whether to exclude bulk import files from minor compactions
+   */
+  boolean shouldExcludeBulk() {
+    return shouldExcludeBulk;
+  }
+
+  /**
+   * @return Ratio used for compaction
+   */
+  double getCompactionRatio() {
+    return compactionRatio;
+  }
+
+  /**
+   * @return Off peak Ratio used for compaction
+   */
+  double getCompactionRatioOffPeak() {
+    return offPeekCompactionRatio;
+  }
+
+  /**
+   * @return Hour at which off-peak compactions start
+   */
+  int getOffPeakStartHour() {
+    return offPeakStartHour;
+  }
+
+  /**
+   * @return Hour at which off-peak compactions end
+   */
+  int getOffPeakEndHour() {
+    return offPeakEndHour;
+  }
+
+  /**
+   * @return ThrottlePoint used for classifying small and large compactions
+   */
+  long getThrottlePoint() {
+    return throttlePoint;
+  }
+
+  /**
+   * @return Major compaction period from compaction.
+   * Major compactions are selected periodically according to this parameter plus jitter
+   */
+  long getMajorCompactionPeriod() {
+    return majorCompactionPeriod;
+  }
+
+  /**
+   * @return Major the jitter fraction, the fraction within which the major compaction period is
+   *  randomly chosen from the majorCompactionPeriod in each store.
+   */
+  float getMajorCompactionJitter() {
+    return majorCompactionJitter;
+  }
+
+  /**
+   * @return Whether expired files should be deleted ASAP using compactions
+   */
+  boolean shouldDeleteExpired() {
+    return shouldDeleteExpired;
+  }
+
+}
\ No newline at end of file

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java?rev=1378348&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionManager.java Tue Aug 28 21:13:38 2012
@@ -0,0 +1,396 @@
+/**
+ * Copyright 2012 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+import java.util.List;
+import java.util.Random;
+
+public class CompactionManager {
+
+  private static final Log LOG = LogFactory.getLog(CompactionManager.class);
+  private final static Calendar calendar = new GregorianCalendar();
+
+  private Store store;
+  CompactionConfiguration comConf;
+
+  CompactionManager(Configuration configuration, Store store) {
+    this.store = store;
+    comConf = new CompactionConfiguration(configuration, store);
+  }
+
+  /**
+   * @param candidateFiles candidate files, ordered from oldest to newest
+   * @return subset copy of candidate list that meets compaction criteria
+   * @throws java.io.IOException
+   */
+  CompactSelection selectCompaction(List<StoreFile> candidateFiles, boolean forceMajor)
+    throws IOException {
+
+    // Prelimanry compaction subject to filters
+    CompactSelection candidateSelection = new CompactSelection(candidateFiles);
+
+    if (!forceMajor) {
+      // If there are expired files, only select them so that compaction deletes them
+      if (comConf.shouldDeleteExpired() && (store.ttl != Long.MAX_VALUE)) {
+        CompactSelection expiredSelection = selectExpiredSFs(
+          candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - store.ttl);
+        if (expiredSelection != null) {
+          return expiredSelection;
+        }
+      }
+      candidateSelection = skipLargeFiles(candidateSelection);
+    }
+
+    // major compact on user action or age (caveat: we have too many files)
+    boolean majorCompaction = (
+      (forceMajor || isMajorCompaction(candidateSelection.getFilesToCompact())) &&
+        candidateSelection.getFilesToCompact().size() < comConf.getMaxFilesToCompact()) ||
+        store.hasReferences(candidateSelection.getFilesToCompact());
+
+    if (!majorCompaction) {
+      // we're doing a minor compaction, let's see what files are applicable
+      candidateSelection = filterBulk(candidateSelection);
+      candidateSelection = applyCompactionPolicy(candidateSelection);
+      candidateSelection = checkMinFilesCriteria(candidateSelection);
+    }
+    candidateSelection = removeExcessFiles(candidateSelection);
+    return candidateSelection;
+  }
+
+  /**
+   * Select the expired store files to compact
+   *
+   * @param candidates the initial set of storeFiles
+   * @param maxExpiredTimeStamp
+   *          The store file will be marked as expired if its max time stamp is
+   *          less than this maxExpiredTimeStamp.
+   * @return A CompactSelection contains the expired store files as
+   *         filesToCompact
+   */
+  private CompactSelection selectExpiredSFs
+      (CompactSelection candidates, long maxExpiredTimeStamp) {
+    if (candidates.filesToCompact == null || candidates.filesToCompact.size() == 0)
+      return null;
+    ArrayList<StoreFile> expiredStoreFiles = null;
+    boolean hasExpiredStoreFiles = false;
+    CompactSelection expiredSFSelection = null;
+
+    for (StoreFile storeFile : candidates.filesToCompact) {
+      if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) {
+        LOG.info("Deleting the expired store file by compaction: "
+            + storeFile.getPath() + " whose maxTimeStamp is "
+            + storeFile.getReader().getMaxTimestamp()
+            + " while the max expired timestamp is " + maxExpiredTimeStamp);
+        if (!hasExpiredStoreFiles) {
+          expiredStoreFiles = new ArrayList<StoreFile>();
+          hasExpiredStoreFiles = true;
+        }
+        expiredStoreFiles.add(storeFile);
+      }
+    }
+
+    if (hasExpiredStoreFiles) {
+      expiredSFSelection = new CompactSelection(expiredStoreFiles);
+    }
+    return expiredSFSelection;
+  }
+
+  /**
+   * @param candidates pre-filtrate
+   * @return filtered subset
+   * exclude all files above maxCompactSize
+   * Also save all references. We MUST compact them
+   */
+  private CompactSelection skipLargeFiles(CompactSelection candidates) {
+    int pos = 0;
+    while (pos < candidates.getFilesToCompact().size() &&
+      candidates.getFilesToCompact().get(pos).getReader().length() >
+        comConf.getMaxCompactSize() &&
+      !candidates.getFilesToCompact().get(pos).isReference()) {
+      ++pos;
+    }
+    if (pos > 0) {
+      LOG.debug("Some files are too large. Excluding " + pos + " files from compaction candidates");
+      candidates.clearSubList(0, pos);
+    }
+    return candidates;
+  }
+
+  /**
+   * @param candidates pre-filtrate
+   * @return filtered subset
+   * exclude all bulk load files if configured
+   */
+  private CompactSelection filterBulk(CompactSelection candidates) {
+    if (comConf.shouldExcludeBulk()) {
+      int previous = candidates.getFilesToCompact().size();
+      candidates.getFilesToCompact().removeAll(Collections2.filter(candidates.getFilesToCompact(),
+          new Predicate<StoreFile>() {
+            @Override
+            public boolean apply(StoreFile input) {
+               // If we have assigned a sequenceId to the hfile, we won't skip the file.
+              return input.isBulkLoadResult() && input.getMaxSequenceId() <= 0;
+            }
+          }));
+      LOG.info("Exclude " + (candidates.getFilesToCompact().size() - previous) +
+          " bulk imported store files from compaction candidates");
+    }
+    return candidates;
+  }
+
+  /**
+   * @param candidates pre-filtrate
+   * @return filtered subset
+   * take upto maxFilesToCompact from the start
+   */
+  private CompactSelection removeExcessFiles(CompactSelection candidates) {
+    int excess = candidates.getFilesToCompact().size() - comConf.getMaxFilesToCompact();
+    if (excess > 0) {
+      LOG.debug("Too many admissible files. Excluding " + excess
+        + " files from compaction candidates");
+      candidates.clearSubList(comConf.getMaxFilesToCompact(),
+        candidates.getFilesToCompact().size());
+    }
+    return candidates;
+  }
+  /**
+   * @param candidates pre-filtrate
+   * @return filtered subset
+   * forget the compactionSelection if we don't have enough files
+   */
+  private CompactSelection checkMinFilesCriteria(CompactSelection candidates) {
+    if (candidates.getFilesToCompact().size() < comConf.getMinFilesToCompact()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Skipped minor compaction of " + store + ". No admissible set of files found.");
+      }
+      candidates.emptyFileList();
+    }
+    return candidates;
+  }
+
+  /**
+    * @param candidates pre-filtrate
+    * @return filtered subset
+    * -- Default minor compaction selection algorithm: Choose CompactSelection from candidates --
+    * First exclude bulk-load files if indicated in configuration.
+    * Start at the oldest file and stop when you find the first file that
+    * meets compaction criteria:
+    * (1) a recently-flushed, small file (i.e. <= minCompactSize)
+    * OR
+    * (2) within the compactRatio of sum(newer_files)
+    * Given normal skew, any newer files will also meet this criteria
+    * <p/>
+    * Additional Note:
+    * If fileSizes.size() >> maxFilesToCompact, we will recurse on
+    * compact().  Consider the oldest files first to avoid a
+    * situation where we always compact [end-threshold,end).  Then, the
+    * last file becomes an aggregate of the previous compactions.
+    *
+    * normal skew:
+    *
+    *         older ----> newer (increasing seqID)
+    *     _
+    *    | |   _
+    *    | |  | |   _
+    *  --|-|- |-|- |-|---_-------_-------  minCompactSize
+    *    | |  | |  | |  | |  _  | |
+    *    | |  | |  | |  | | | | | |
+    *    | |  | |  | |  | | | | | |
+    */
+  CompactSelection applyCompactionPolicy(CompactSelection candidates) throws IOException {
+    if (candidates.getFilesToCompact().isEmpty()) {
+      return candidates;
+    }
+    // we're doing a minor compaction, let's see what files are applicable
+    int start = 0;
+    double r = comConf.getCompactionRatio();
+
+    if (isOffPeakHour() && !(candidates.getNumOutStandingOffPeakCompactions() > 0)) {
+      r = comConf.getCompactionRatioOffPeak();
+      candidates.setOffPeak();
+      LOG.info("Running an off-peak compaction, selection ratio = " + r
+          + ", numOutstandingOffPeakCompactions is now "
+          + candidates.getNumOutStandingOffPeakCompactions());
+    }
+
+    // get store file sizes for incremental compacting selection.
+    int countOfFiles = candidates.getFilesToCompact().size();
+    long[] fileSizes = new long[countOfFiles];
+    long[] sumSize = new long[countOfFiles];
+    for (int i = countOfFiles - 1; i >= 0; --i) {
+      StoreFile file = candidates.getFilesToCompact().get(i);
+      fileSizes[i] = file.getReader().length();
+      // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
+      int tooFar = i + comConf.getMaxFilesToCompact() - 1;
+      sumSize[i] = fileSizes[i]
+        + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
+        - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
+    }
+
+
+    while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
+      fileSizes[start] > Math.max(comConf.getMinCompactSize(), (long) (sumSize[start + 1] * r))) {
+      ++start;
+    }
+    if (start < countOfFiles) {
+      LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
+        + " files from " + countOfFiles + " candidates");
+    }
+
+    candidates = candidates.getSubList(start, countOfFiles);
+
+    return candidates;
+  }
+
+  /**
+   * @param filesToCompact Files to compact. Can be null.
+   * @return True if we should run a major compaction.
+   */
+  boolean isMajorCompaction(final List<StoreFile> filesToCompact) throws IOException {
+
+    boolean result = false;
+    long mcTime = getNextMajorCompactTime();
+    if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
+      return result;
+    }
+    long lowTimestamp = getLowestTimestamp(filesToCompact);
+    long now = System.currentTimeMillis();
+    if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
+      // Major compaction time has elapsed.
+      long elapsedTime = now - lowTimestamp;
+      if (filesToCompact.size() == 1 &&
+        filesToCompact.get(0).isMajorCompaction() &&
+        (store.ttl == HConstants.FOREVER || elapsedTime < store.ttl)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skipping major compaction of " + store
+            + " because one (major) compacted file only and elapsedTime "
+            + elapsedTime + "ms is < ttl=" + store.ttl);
+        }
+      } else if (store.isPeakTime(calendar.get(Calendar.HOUR_OF_DAY))) {
+        LOG.debug("Peak traffic time for HBase, not scheduling any major "
+          + "compactions. Peak hour period is : " + comConf.getOffPeakStartHour() + " - "
+          + comConf.getOffPeakEndHour() + " current hour is : "
+          + calendar.get(Calendar.HOUR_OF_DAY));
+        result = false;
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Major compaction triggered on store " + store + "; "
+            + "time since last major compaction " + StringUtils.formatTimeDiff(now, lowTimestamp));
+        }
+        result = true;
+      }
+    }
+    return result;
+  }
+
+  long getNextMajorCompactTime() {
+    // default = 24hrs
+    long ret = comConf.getMajorCompactionPeriod();
+    String strCompactionTime = store.getFamily().getValue(HConstants.MAJOR_COMPACTION_PERIOD);
+    if (strCompactionTime != null) {
+      ret = (new Long(strCompactionTime)).longValue();
+    }
+
+    if (ret > 0) {
+      // default = 20% = +/- 4.8 hrs
+      double jitterPct = comConf.getMajorCompactionJitter();
+      if (jitterPct > 0) {
+        long jitter = Math.round(ret * jitterPct);
+        // deterministic jitter avoids a major compaction storm on restart
+        Integer seed = store.getDeterministicRandomSeed();
+        if (seed != null) {
+          double rnd = (new Random(seed)).nextDouble();
+          ret += jitter - Math.round(2L * jitter * rnd);
+        } else {
+          ret = 0; // no storefiles == no major compaction
+        }
+      }
+    }
+    return ret;
+  }
+
+  /*
+   * Gets lowest timestamp from candidate StoreFiles
+   *
+   * @param fs
+   * @param dir
+   * @throws IOException
+   */
+  static long getLowestTimestamp(final List<StoreFile> candidates)
+    throws IOException {
+    long minTs = Long.MAX_VALUE;
+    for (StoreFile storeFile : candidates) {
+      minTs = Math.min(minTs, storeFile.getModificationTimeStamp());
+    }
+    return minTs;
+  }
+
+  /**
+   * @param compactionSize Total size of some compaction
+   * @return whether this should be a large or small compaction
+   */
+  boolean throttleCompaction(long compactionSize) {
+    return compactionSize > comConf.getThrottlePoint();
+  }
+
+  /**
+   * @param numCandidates Number of candidate store files
+   * @return whether a compactionSelection is possible
+   */
+  boolean needsCompaction(int numCandidates) {
+    return numCandidates > comConf.getMinFilesToCompact();
+  }
+
+  /**
+   * @return whether this is off-peak hour
+   */
+  private boolean isOffPeakHour() {
+    int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
+    int startHour = comConf.getOffPeakStartHour();
+    int endHour = comConf.getOffPeakEndHour();
+    // If offpeak time checking is disabled just return false.
+    if (startHour == endHour) {
+      return false;
+    }
+    if (startHour < endHour) {
+      return (currentHour >= startHour && currentHour < endHour);
+    }
+    return (currentHour >= startHour || currentHour < endHour);
+  }
+
+  private boolean isValidHour(int hour) {
+    return (hour >= 0 && hour <= 23);
+  }
+
+}
\ No newline at end of file

Copied: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequest.java (from r1378342, hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java)
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequest.java?p2=hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequest.java&p1=hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java&r1=1378342&r2=1378348&rev=1378348&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequest.java Tue Aug 28 21:13:38 2012
@@ -17,7 +17,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.regionserver.compactions;
+
+package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
 import java.util.Date;
@@ -28,10 +29,6 @@ import java.util.concurrent.ThreadPoolEx
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.util.StringUtils;
 
@@ -54,9 +51,9 @@ public class CompactionRequest implement
     private final boolean isMajor;
     private int p;
     private final Date date;
-    private HRegionServer server = null;
+    private HRegionServer server = null;    
 
-    public CompactionRequest(HRegion r, Store s,
+    CompactionRequest(HRegion r, Store s,
         CompactSelection files, boolean isMajor, int p) {
       Preconditions.checkNotNull(r);
       Preconditions.checkNotNull(files);
@@ -74,7 +71,15 @@ public class CompactionRequest implement
       this.date = new Date();
     }
 
-    public void finishRequest() {
+    long getCompactSelectionID() {
+      return compactSelection.getCompactSelectionID();
+    }
+
+    long getSelectionTime() {
+      return compactSelection.getSelectionTime();
+    }
+
+    void finishRequest() {
       this.compactSelection.finishRequest();
     }
 
@@ -112,45 +117,45 @@ public class CompactionRequest implement
     }
 
     /** Gets the HRegion for the request */
-    public HRegion getHRegion() {
+    HRegion getHRegion() {
       return r;
     }
-
+    
     /** Gets the Store for the request */
-    public Store getStore() {
+    Store getStore() {
       return s;
     }
 
     /** Gets the compact selection object for the request */
-    public CompactSelection getCompactSelection() {
+    CompactSelection getCompactSelection() {
       return compactSelection;
     }
-
+    
     /** Gets the StoreFiles for the request */
-    public List<StoreFile> getFiles() {
+    List<StoreFile> getFiles() {
       return compactSelection.getFilesToCompact();
     }
-
+    
     /** Gets the total size of all StoreFiles in compaction */
-    public long getSize() {
+    long getSize() {
       return totalSize;
     }
-
-    public boolean isMajor() {
+    
+    boolean isMajor() {
       return this.isMajor;
     }
 
-    public void setServer(HRegionServer server) {
+    void setServer(HRegionServer server) {
       this.server = server;
     }
 
     /** Gets the priority for the request */
-    public int getPriority() {
+    int getPriority() {
       return p;
     }
 
     /** Gets the priority for the request */
-    public void setPriority(int p) {
+    void setPriority(int p) {
       this.p = p;
     }
 
@@ -165,14 +170,14 @@ public class CompactionRequest implement
             }),
         new Function<StoreFile, String>() {
           public String apply(StoreFile sf) {
-
+            
             return StringUtils.humanReadableInt(sf.getReader().length());
           }
         }));
 
       return "regionName=" + r.getRegionNameAsString() +
         ", storeName=" + new String(s.getFamily().getName()) +
-        ", fileCount=" + compactSelection.getFilesToCompact().size() +
+        ", fileCount=" + compactSelection.getFilesToCompact().size() + 
         ", fileSize=" + StringUtils.humanReadableInt(totalSize) +
           ((fsList.isEmpty()) ? "" : " (" + fsList + ")") +
         ", priority=" + p + ", date=" + date;
@@ -183,7 +188,7 @@ public class CompactionRequest implement
       try {
         if (server.isStopRequested()) {
           return;
-        }
+        } 
         long start = EnvironmentEdgeManager.currentTimeMillis();
         boolean completed = r.compact(this);
         long now = EnvironmentEdgeManager.currentTimeMillis();
@@ -196,7 +201,7 @@ public class CompactionRequest implement
             server.compactSplitThread
               .requestCompaction(r, s, "Recursive enqueue");
           }
-        }
+        } 
       } catch (IOException ex) {
         LOG.error("Compaction failed " + this, RemoteExceptionHandler
             .checkIOException(ex));
@@ -213,14 +218,14 @@ public class CompactionRequest implement
         if (server != null) {
           LOG.debug("CompactSplitThread Status: " + server.compactSplitThread);
         }
-      }
+      } 
     }
 
     /**
      * Cleanup class to use when rejecting a compaction request from the queue.
      */
     public static class Rejection implements RejectedExecutionHandler {
-
+  
       @Override
       public void rejectedExecution(Runnable request, ThreadPoolExecutor pool) {
         if (request instanceof CompactionRequest) {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1378348&r1=1378347&r2=1378348&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Aug 28 21:13:38 2012
@@ -57,7 +57,6 @@ import java.util.Date;
 import java.util.Calendar;
 import java.text.SimpleDateFormat;
 
-import org.apache.avro.ipc.Server;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -76,7 +75,6 @@ import org.apache.hadoop.hbase.HTableDes
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.UnknownScannerException;
-import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
 import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
@@ -84,7 +82,6 @@ import org.apache.hadoop.hbase.client.Mu
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.RowLock;
-import org.apache.hadoop.hbase.client.RowMutation;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
@@ -96,7 +93,6 @@ import org.apache.hadoop.hbase.ipc.HRegi
 import org.apache.hadoop.hbase.metrics.RequestMetrics;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
@@ -3650,7 +3646,7 @@ public class HRegion implements HeapSize
   }
 
   /**
-   * @return True if needs a mojor compaction.
+   * @return True if needs a major compaction.
    * @throws IOException
    */
   boolean isMajorCompaction() throws IOException {
@@ -4153,4 +4149,10 @@ public class HRegion implements HeapSize
      }
   }
 
+  void updateConfiguration() {
+    for (Store s : stores.values()) {
+      s.updateConfiguration();
+    }
+  }
+
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1378348&r1=1378347&r2=1378348&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Aug 28 21:13:38 2012
@@ -3460,4 +3460,11 @@ public class HRegionServer implements HR
     return "RS-" + serverInfo.getServerName();
   }
 
+  public void updateConfiguration() {
+    conf.reloadConfiguration();
+    for (HRegion r : onlineRegions.values()) {
+      r.updateConfiguration();
+    }
+  }
+  
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1378348&r1=1378347&r2=1378348&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Aug 28 21:13:38 2012
@@ -21,14 +21,12 @@ package org.apache.hadoop.hbase.regionse
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
-import java.util.Calendar;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.GregorianCalendar;
 import java.util.List;
 import java.util.NavigableSet;
-import java.util.Random;
 import java.util.SortedSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
@@ -62,8 +60,6 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -72,8 +68,6 @@ import org.apache.hadoop.hbase.util.Envi
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
@@ -108,16 +102,13 @@ public class Store extends SchemaConfigu
   private final Path homedir;
   private final HRegion region;
   private final HColumnDescriptor family;
+  CompactionManager compactionManager;
   final FileSystem fs;
   final Configuration conf;
   final CacheConfig cacheConf;
   // ttl in milliseconds.
   protected long ttl;
   private long timeToPurgeDeletes;
-  private final int minFilesToCompact;
-  private final int maxFilesToCompact;
-  private final long minCompactSize;
-  private final long maxCompactSize;
   private long lastCompactSize = 0;
   volatile boolean forceMajor = false;
   /* how many bytes to write between status checks */
@@ -130,7 +121,6 @@ public class Store extends SchemaConfigu
   private final String storeNameStr;
   private int peakStartHour;
   private int peakEndHour;
-  private final static Calendar calendar = new GregorianCalendar();
 
   /*
    * List of store files inside this store. This is an immutable list that
@@ -213,11 +203,6 @@ public class Store extends SchemaConfigu
     this.memstore = new MemStore(this.comparator);
     this.storeNameStr = getColumnFamilyName();
 
-    // By default, compact if storefile.count >= minFilesToCompact
-    this.minFilesToCompact = Math.max(2,
-      conf.getInt("hbase.hstore.compaction.min",
-        /*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));
-
     // Setting up cache configuration for this family
     this.cacheConf = new CacheConfig(conf, family);
 
@@ -231,12 +216,6 @@ public class Store extends SchemaConfigu
     this.blockingStoreFileCount =
       conf.getInt("hbase.hstore.blockingStoreFiles", -1);
 
-    this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10);
-    this.minCompactSize = conf.getLong("hbase.hstore.compaction.min.size",
-      this.region.memstoreFlushSize);
-    this.maxCompactSize
-      = conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE);
-
     if (Store.closeCheckInterval == 0) {
       Store.closeCheckInterval = conf.getInt(
           "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
@@ -253,6 +232,50 @@ public class Store extends SchemaConfigu
       }
       this.peakStartHour = this.peakEndHour = -1;
     }
+
+    setCompactionPolicy(conf.get(HConstants.COMPACTION_MANAGER_CLASS,
+                                 HConstants.DEFAULT_COMPACTION_MANAGER_CLASS));
+  }
+
+  /**
+   * This setter is used for unit testing
+   * TODO: Fix this for online configuration updating
+   */
+  void setCompactionPolicy(String managerClassName) {
+    try {
+      Class<? extends CompactionManager> managerClass =
+        (Class<? extends CompactionManager>) Class.forName(managerClassName);
+      compactionManager = managerClass.getDeclaredConstructor(
+          new Class[] {Configuration.class, Store.class } ).newInstance(
+          new Object[] { conf, this } );
+    } catch (ClassNotFoundException e) {
+      throw new UnsupportedOperationException(
+          "Unable to find region server interface " + managerClassName, e);
+    } catch (IllegalAccessException e) {
+      throw new UnsupportedOperationException(
+          "Unable to access specified class " + managerClassName, e);
+    } catch (InstantiationException e) {
+      throw new UnsupportedOperationException(
+          "Unable to instantiate specified class " + managerClassName, e);
+    } catch (InvocationTargetException e) {
+      throw new UnsupportedOperationException(
+          "Unable to invoke specified target class constructor " + managerClassName, e);
+    } catch (NoSuchMethodException e) {
+      throw new UnsupportedOperationException(
+          "Unable to find suitable constructor for class " + managerClassName, e);
+    }
+  }
+
+ /**
+  * @return A hash code depending on the state of the current store files.
+  * This is used as seed for deterministic random generator for selecting major compaction time
+  */
+  Integer getDeterministicRandomSeed() {
+    ImmutableList<StoreFile> snapshot = storefiles;
+    if (snapshot != null && !snapshot.isEmpty()) {
+      return snapshot.get(0).getPath().getName().hashCode();
+    }
+    return null;
   }
 
   private boolean isValidHour(int hour) {
@@ -652,9 +675,10 @@ public class Store extends SchemaConfigu
           } while (hasMore);
         } finally {
           // Write out the log sequence number that corresponds to this output
+          // Write current time in metadata as minFlushTime
           // hfile.  The hfile is current up to and including logCacheFlushId.
           status.setStatus("Flushing " + this + ": appending metadata");
-          writer.appendMetadata(logCacheFlushId, false);
+          writer.appendMetadata(EnvironmentEdgeManager.currentTimeMillis(), logCacheFlushId, false);
           status.setStatus("Flushing " + this + ": closing flushed file");
           writer.close();
         }
@@ -684,7 +708,7 @@ public class Store extends SchemaConfigu
     // the flushing through the StoreFlusherImpl class
     getSchemaMetrics().updatePersistentStoreMetric(
         SchemaMetrics.StoreMetricType.FLUSH_SIZE, flushed);
-    if(LOG.isInfoEnabled()) {
+    if (LOG.isInfoEnabled()) {
       LOG.info("Added " + sf + ", entries=" + r.getEntries() +
         ", sequenceid=" + logCacheFlushId +
         ", memsize=" + StringUtils.humanReadableInt(flushed) +
@@ -867,11 +891,11 @@ public class Store extends SchemaConfigu
 
     // Ready to go. Have list of files to compact.
     MonitoredTask status = TaskMonitor.get().createStatus(
-        (cr.isMajor() ? "Major " : "") + "Compaction of "
+        (cr.isMajor() ? "Major " : "") + "Compaction (ID: " + cr.getCompactSelectionID() + ") of "
         + this.storeNameStr + " on "
         + this.region.getRegionInfo().getRegionNameAsString());
-    LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
-        + this.storeNameStr + " of "
+    LOG.info("Starting compaction (ID: " + cr.getCompactSelectionID() + ") of "
+        + filesToCompact.size() + " file(s) in " + this.storeNameStr + " of "
         + this.region.getRegionInfo().getRegionNameAsString()
         + " into " + region.getTmpDir() + ", seqid=" + maxId + ", totalSize="
         + StringUtils.humanReadableInt(cr.getSize()));
@@ -879,18 +903,23 @@ public class Store extends SchemaConfigu
     StoreFile sf = null;
     try {
       status.setStatus("Compacting " + filesToCompact.size() + " file(s)");
+      long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis();
       StoreFile.Writer writer = compactStores(filesToCompact, cr.isMajor(), maxId);
       // Move the compaction into place.
       sf = completeCompaction(filesToCompact, writer);
 
       // Report that the compaction is complete.
       status.markComplete("Completed compaction");
-      LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
+      LOG.info("Completed" + (cr.isMajor() ? " major " : " ")
+          + "compaction (ID: " + cr.getCompactSelectionID() + ") of "
           + filesToCompact.size() + " file(s) in " + this.storeNameStr + " of "
           + this.region.getRegionInfo().getRegionNameAsString()
-          + "; new storefile name=" + (sf == null ? "none" : sf.toString())
-          + ", size=" + (sf == null ? "none" :
-            StringUtils.humanReadableInt(sf.getReader().length()))
+          + "; This selection was in queue for "
+          + StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()) + ", and took "
+          + StringUtils.formatTimeDiff(EnvironmentEdgeManager.currentTimeMillis(),
+                                       compactionStartTime)
+          + " to execute. New storefile name=" + (sf == null ? "none" : sf.toString())
+          + ", size=" + (sf == null? "none" : StringUtils.humanReadableInt(sf.getReader().length()))
           + "; total size for store is "
           + StringUtils.humanReadableInt(storeSize));
     } catch (IOException ioe) {
@@ -960,7 +989,7 @@ public class Store extends SchemaConfigu
    * @param files
    * @return True if any of the files in <code>files</code> are References.
    */
-  private boolean hasReferences(Collection<StoreFile> files) {
+  boolean hasReferences(Collection<StoreFile> files) {
     if (files != null && files.size() > 0) {
       for (StoreFile hsf: files) {
         if (hsf.isReference()) {
@@ -972,22 +1001,6 @@ public class Store extends SchemaConfigu
   }
 
   /*
-   * Gets lowest timestamp from candidate StoreFiles
-   *
-   * @param fs
-   * @param dir
-   * @throws IOException
-   */
-  public static long getLowestTimestamp(final List<StoreFile> candidates)
-      throws IOException {
-    long minTs = Long.MAX_VALUE;
-    for (StoreFile storeFile : candidates) {
-      minTs = Math.min(minTs, storeFile.getModificationTimeStamp());
-    }
-    return minTs;
-  }
-
-  /*
    * @return True if we should run a major compaction.
    */
   boolean isMajorCompaction() throws IOException {
@@ -999,16 +1012,7 @@ public class Store extends SchemaConfigu
     }
 
     List<StoreFile> candidates = new ArrayList<StoreFile>(this.storefiles);
-
-    // exclude files above the max compaction threshold
-    // except: save all references. we MUST compact them
-    int pos = 0;
-    while (pos < candidates.size() &&
-           candidates.get(pos).getReader().length() > this.maxCompactSize &&
-           !candidates.get(pos).isReference()) ++pos;
-    candidates.subList(0, pos).clear();
-
-    return isMajorCompaction(candidates);
+    return compactionManager.isMajorCompaction(candidates);
   }
 
   boolean isPeakTime(int currentHour) {
@@ -1022,76 +1026,6 @@ public class Store extends SchemaConfigu
     return (currentHour >= this.peakStartHour || currentHour < this.peakEndHour);
   }
 
-  /*
-   * @param filesToCompact Files to compact. Can be null.
-   * @return True if we should run a major compaction.
-   */
-  private boolean isMajorCompaction(final List<StoreFile> filesToCompact) throws IOException {
-    boolean result = false;
-    long mcTime = getNextMajorCompactTime();
-    if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
-      return result;
-    }
-    long lowTimestamp = getLowestTimestamp(filesToCompact);
-    long now = System.currentTimeMillis();
-    if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
-      // Major compaction time has elapsed.
-      long elapsedTime = now - lowTimestamp;
-      if (filesToCompact.size() == 1 &&
-          filesToCompact.get(0).isMajorCompaction() &&
-          (this.ttl == HConstants.FOREVER || elapsedTime < this.ttl)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Skipping major compaction of " + this.storeNameStr +
-            " because one (major) compacted file only and elapsedTime " +
-            elapsedTime + "ms is < ttl=" + this.ttl);
-        }
-      } else if (isPeakTime(calendar.get(Calendar.HOUR_OF_DAY))) {
-        LOG.debug("Peak traffic time for HBase, not scheduling any major " +
-            "compactions. Peak hour period is : " + this.peakStartHour + " - " +
-            this.peakEndHour + " current hour is : " +
-            calendar.get(Calendar.HOUR_OF_DAY));
-        result = false;
-      } else {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Major compaction triggered on store " + this.storeNameStr +
-            "; time since last major compaction " +
-            StringUtils.formatTimeDiff(now, lowTimestamp));
-        }
-        result = true;
-      }
-    }
-    return result;
-  }
-
-  long getNextMajorCompactTime() {
-    // default = 24hrs
-    long ret = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
-    if (family.getValue(HConstants.MAJOR_COMPACTION_PERIOD) != null) {
-      String strCompactionTime =
-        family.getValue(HConstants.MAJOR_COMPACTION_PERIOD);
-      ret = (new Long(strCompactionTime)).longValue();
-    }
-
-    if (ret > 0) {
-      // default = 20% = +/- 4.8 hrs
-      double jitterPct =  conf.getFloat("hbase.hregion.majorcompaction.jitter",
-          0.20F);
-      if (jitterPct > 0) {
-        long jitter = Math.round(ret * jitterPct);
-        // deterministic jitter avoids a major compaction storm on restart
-        ImmutableList<StoreFile> snapshot = storefiles;
-        if (snapshot != null && !snapshot.isEmpty()) {
-          String seed = snapshot.get(0).getPath().getName();
-          double curRand = new Random(seed.hashCode()).nextDouble();
-          ret += jitter - Math.round(2L * jitter * curRand);
-        } else {
-          ret = 0; // no storefiles == no major compaction
-        }
-      }
-    }
-    return ret;
-  }
-
   public CompactionRequest requestCompaction() {
     // don't even select for compaction if writes are disabled
     if (!this.region.areWritesEnabled()) {
@@ -1112,7 +1046,9 @@ public class Store extends SchemaConfigu
           Preconditions.checkArgument(idx != -1);
           candidates.subList(0, idx + 1).clear();
         }
-        CompactSelection filesToCompact = compactSelection(candidates);
+        CompactSelection filesToCompact;
+        filesToCompact = compactionManager.selectCompaction(candidates,
+          forceMajor && filesCompacting.isEmpty());
 
         // no files to compact
         if (filesToCompact.getFilesToCompact().isEmpty()) {
@@ -1156,170 +1092,6 @@ public class Store extends SchemaConfigu
   }
 
   /**
-   * Algorithm to choose which files to compact
-   *
-   * Configuration knobs:
-   *  "hbase.hstore.compaction.ratio"
-   *    normal case: minor compact when file <= sum(smaller_files) * ratio
-   *  "hbase.hstore.compaction.min.size"
-   *    unconditionally compact individual files below this size
-   *  "hbase.hstore.compaction.max.size"
-   *    never compact individual files above this size (unless splitting)
-   *  "hbase.hstore.compaction.min"
-   *    min files needed to minor compact
-   *  "hbase.hstore.compaction.max"
-   *    max files to compact at once (avoids OOM)
-   *
-   * @param candidates candidate files, ordered from oldest to newest
-   * @return subset copy of candidate list that meets compaction criteria
-   * @throws IOException
-   */
-  CompactSelection compactSelection(List<StoreFile> candidates)
-      throws IOException {
-    // ASSUMPTION!!! filesCompacting is locked when calling this function
-
-    /* normal skew:
-     *
-     *         older ----> newer
-     *     _
-     *    | |   _
-     *    | |  | |   _
-     *  --|-|- |-|- |-|---_-------_-------  minCompactSize
-     *    | |  | |  | |  | |  _  | |
-     *    | |  | |  | |  | | | | | |
-     *    | |  | |  | |  | | | | | |
-     */
-    CompactSelection compactSelection = new CompactSelection(conf, candidates);
-
-    boolean forcemajor = this.forceMajor && filesCompacting.isEmpty();
-    if (!forcemajor) {
-      // Delete the expired store files before the compaction selection.
-      if (conf.getBoolean("hbase.store.delete.expired.storefile", true)
-          && (ttl != Long.MAX_VALUE)) {
-        CompactSelection expiredSelection = compactSelection
-            .selectExpiredStoreFilesToCompact(
-                EnvironmentEdgeManager.currentTimeMillis() - this.ttl);
-
-        // If there is any expired store files, delete them  by compaction.
-        if (expiredSelection != null) {
-          return expiredSelection;
-        }
-      }
-      // do not compact old files above a configurable threshold
-      // save all references. we MUST compact them
-      int pos = 0;
-      while (pos < compactSelection.getFilesToCompact().size() &&
-             compactSelection.getFilesToCompact().get(pos).getReader().length()
-               > maxCompactSize &&
-             !compactSelection.getFilesToCompact().get(pos).isReference()) ++pos;
-      compactSelection.clearSubList(0, pos);
-    }
-
-    if (compactSelection.getFilesToCompact().isEmpty()) {
-      LOG.debug(this.storeNameStr + ": no store files to compact");
-      compactSelection.emptyFileList();
-      return compactSelection;
-    }
-
-    // major compact on user action or age (caveat: we have too many files)
-    boolean majorcompaction =
-      (forcemajor || isMajorCompaction(compactSelection.getFilesToCompact()))
-      && compactSelection.getFilesToCompact().size() < this.maxFilesToCompact;
-
-    if (!majorcompaction &&
-        !hasReferences(compactSelection.getFilesToCompact())) {
-      // we're doing a minor compaction, let's see what files are applicable
-      int start = 0;
-      double r = compactSelection.getCompactSelectionRatio();
-
-      // exclude bulk import files from minor compactions, if configured
-      if (conf.getBoolean("hbase.hstore.compaction.exclude.bulk", false)) {
-        int previous = compactSelection.getFilesToCompact().size();
-        compactSelection.getFilesToCompact().removeAll(Collections2.filter(
-            compactSelection.getFilesToCompact(),
-            new Predicate<StoreFile>() {
-              @Override
-              public boolean apply(StoreFile input) {
-                // If we have assigned a sequenceId to the hfile, we won't skip the file.
-                return input.isBulkLoadResult() && input.getMaxSequenceId() <= 0;
-              }
-            }));
-        LOG.debug("Exclude " +
-            (compactSelection.getFilesToCompact().size() - previous) +
-            " store files from compaction");
-      }
-
-      // skip selection algorithm if we don't have enough files
-      if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
-        compactSelection.emptyFileList();
-        return compactSelection;
-      }
-
-      /* TODO: add sorting + unit test back in when HBASE-2856 is fixed
-      //sort files by size to correct when normal skew is altered by bulk load
-      Collections.sort(filesToCompact, StoreFile.Comparators.FILE_SIZE);
-       */
-
-      // get store file sizes for incremental compacting selection.
-      int countOfFiles = compactSelection.getFilesToCompact().size();
-      long [] fileSizes = new long[countOfFiles];
-      long [] sumSize = new long[countOfFiles];
-      for (int i = countOfFiles-1; i >= 0; --i) {
-        StoreFile file = compactSelection.getFilesToCompact().get(i);
-        fileSizes[i] = file.getReader().length();
-        // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
-        int tooFar = i + this.maxFilesToCompact - 1;
-        sumSize[i] = fileSizes[i]
-                   + ((i+1    < countOfFiles) ? sumSize[i+1]      : 0)
-                   - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
-      }
-
-      /* Start at the oldest file and stop when you find the first file that
-       * meets compaction criteria:
-       *   (1) a recently-flushed, small file (i.e. <= minCompactSize)
-       *      OR
-       *   (2) within the compactRatio of sum(newer_files)
-       * Given normal skew, any newer files will also meet this criteria
-       *
-       * Additional Note:
-       * If fileSizes.size() >> maxFilesToCompact, we will recurse on
-       * compact().  Consider the oldest files first to avoid a
-       * situation where we always compact [end-threshold,end).  Then, the
-       * last file becomes an aggregate of the previous compactions.
-       */
-      while(countOfFiles - start >= this.minFilesToCompact &&
-            fileSizes[start] >
-              Math.max(minCompactSize, (long)(sumSize[start+1] * r))) {
-        ++start;
-      }
-      int end = Math.min(countOfFiles, start + this.maxFilesToCompact);
-      long totalSize = fileSizes[start]
-                     + ((start+1 < countOfFiles) ? sumSize[start+1] : 0);
-      compactSelection = compactSelection.getSubList(start, end);
-
-      // if we don't have enough files to compact, just wait
-      if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Skipped compaction of " + this.storeNameStr
-            + ".  Only " + (end - start) + " file(s) of size "
-            + StringUtils.humanReadableInt(totalSize)
-            + " have met compaction criteria.");
-        }
-        compactSelection.emptyFileList();
-        return compactSelection;
-      }
-    } else {
-      // all files included in this compaction, up to max
-      if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
-        int pastMax =
-          compactSelection.getFilesToCompact().size() - this.maxFilesToCompact;
-        compactSelection.clearSubList(0, pastMax);
-      }
-    }
-    return compactSelection;
-  }
-
-  /**
    * Do a minor/major compaction on an explicit set of storefiles.
    * Uses the scan infrastructure to make it easy.
    *
@@ -1333,9 +1105,13 @@ public class Store extends SchemaConfigu
   StoreFile.Writer compactStores(final Collection<StoreFile> filesToCompact,
                                final boolean majorCompaction, final long maxId)
       throws IOException {
-    // calculate maximum key count after compaction (for blooms)
+    // calculate maximum key count (for blooms), and minFlushTime after compaction
     long maxKeyCount = 0;
+    long minFlushTime = Long.MAX_VALUE;
     for (StoreFile file : filesToCompact) {
+      if (file.hasMinFlushTime() && file.getMinFlushTime() < minFlushTime) {
+          minFlushTime = file.getMinFlushTime();
+      }
       StoreFile.Reader r = file.getReader();
       if (r != null) {
         // NOTE: getFilterEntries could cause under-sized blooms if the user
@@ -1370,8 +1146,9 @@ public class Store extends SchemaConfigu
         Scan scan = new Scan();
         scan.setMaxVersions(family.getMaxVersions());
         /* include deletes, unless we are doing a major compaction */
-        long retainDeletesUntil = (majorCompaction)?
-          (this.timeToPurgeDeletes <= 0 ? Long.MAX_VALUE: (System.currentTimeMillis() - this.timeToPurgeDeletes))
+        long retainDeletesUntil = (majorCompaction) ?
+          (this.timeToPurgeDeletes <= 0 ? Long.MAX_VALUE :
+             (System.currentTimeMillis() - this.timeToPurgeDeletes))
           : Long.MIN_VALUE;
         scanner = new StoreScanner(this, scan, scanners, smallestReadPoint,
             retainDeletesUntil);
@@ -1419,7 +1196,10 @@ public class Store extends SchemaConfigu
       }
     } finally {
       if (writer != null) {
-        writer.appendMetadata(maxId, majorCompaction);
+        if (minFlushTime == Long.MAX_VALUE) {
+          minFlushTime = HConstants.NO_MIN_FLUSH_TIME;
+        }
+        writer.appendMetadata(minFlushTime, maxId, majorCompaction);
         writer.close();
       }
     }
@@ -1933,10 +1713,7 @@ public class Store extends SchemaConfigu
   }
 
   boolean throttleCompaction(long compactionSize) {
-    long throttlePoint = conf.getLong(
-        "hbase.regionserver.thread.compaction.throttle",
-        2 * this.minFilesToCompact * this.region.memstoreFlushSize);
-    return compactionSize > throttlePoint;
+    return compactionManager.throttleCompaction(compactionSize);
   }
 
   HRegion getHRegion() {
@@ -2025,7 +1802,7 @@ public class Store extends SchemaConfigu
    *  the number defined in minFilesToCompact
    */
   public boolean needsCompaction() {
-    return (storefiles.size() - filesCompacting.size()) > minFilesToCompact;
+    return compactionManager.needsCompaction(storefiles.size() - filesCompacting.size());
   }
 
   /**
@@ -2037,8 +1814,8 @@ public class Store extends SchemaConfigu
 
   public static final long FIXED_OVERHEAD =
       ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE +
-          + (16 * ClassSize.REFERENCE) + (7 * Bytes.SIZEOF_LONG)
-          + (6 * Bytes.SIZEOF_INT) + 2 * Bytes.SIZEOF_BOOLEAN);
+          + (17 * ClassSize.REFERENCE) + (5 * Bytes.SIZEOF_LONG)
+          + (4 * Bytes.SIZEOF_INT) + 2 * Bytes.SIZEOF_BOOLEAN);
 
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
       ClassSize.OBJECT + ClassSize.REENTRANT_LOCK +
@@ -2054,4 +1831,9 @@ public class Store extends SchemaConfigu
     return comparator;
   }
 
+  void updateConfiguration() {
+    setCompactionPolicy(conf.get(HConstants.COMPACTION_MANAGER_CLASS,
+                                 HConstants.DEFAULT_COMPACTION_MANAGER_CLASS));
+  }
+
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1378348&r1=1378347&r2=1378348&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Tue Aug 28 21:13:38 2012
@@ -41,6 +41,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue.KVComparator;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
@@ -62,6 +63,7 @@ import org.apache.hadoop.hbase.util.Bloo
 import org.apache.hadoop.hbase.util.BloomFilterFactory;
 import org.apache.hadoop.hbase.util.BloomFilterWriter;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.WritableUtils;
@@ -107,6 +109,9 @@ public class StoreFile extends SchemaCon
   /** Max Sequence ID in FileInfo */
   public static final byte [] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
 
+  /** Min Flush time in FileInfo */
+  public static final byte [] MIN_FLUSH_TIME = Bytes.toBytes("MIN_FLUSH_TIME");
+
   /** Major compaction flag in FileInfo */
   public static final byte[] MAJOR_COMPACTION_KEY =
       Bytes.toBytes("MAJOR_COMPACTION_KEY");
@@ -153,6 +158,8 @@ public class StoreFile extends SchemaCon
   // Keys for metadata stored in backing HFile.
   // Set when we obtain a Reader.
   private long sequenceid = -1;
+  // default value is -1, remains -1 if file written without minFlushTime
+  private long minFlushTime = HConstants.NO_MIN_FLUSH_TIME;
 
   // max of the MemstoreTS in the KV's in this store
   // Set when we obtain a Reader.
@@ -343,6 +350,22 @@ public class StoreFile extends SchemaCon
     return this.sequenceid;
   }
 
+  public boolean hasMinFlushTime() {
+    return this.minFlushTime != HConstants.NO_MIN_FLUSH_TIME;
+  }
+
+  public long getMinFlushTime() {
+      // BulkLoad files are assumed to contain very old data, return 0
+      if (isBulkLoadResult() && getMaxSequenceId() <= 0) {
+        return 0;
+      } else if (this.minFlushTime == HConstants.NO_MIN_FLUSH_TIME) {
+          // File written without minFlushTime field assume recent data
+          return EnvironmentEdgeManager.currentTimeMillis();
+      } else {
+        return this.minFlushTime;
+      }
+  }
+
   public long getModificationTimeStamp() {
     return modificationTimeStamp;
   }
@@ -462,7 +485,10 @@ public class StoreFile extends SchemaCon
         }
       }
     }
-    
+    b = metadataMap.get(MIN_FLUSH_TIME);
+    if (b != null) {
+        this.minFlushTime = Bytes.toLong(b);
+    }
     this.reader.setSequenceID(this.sequenceid);
 
     b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
@@ -891,18 +917,30 @@ public class StoreFile extends SchemaCon
     /**
      * Writes meta data.
      * Call before {@link #close()} since its written as meta data to this file.
+     * @param minFlushTime maximum Flush time among the data present in file.
      * @param maxSequenceId Maximum sequence id.
      * @param majorCompaction True if this file is product of a major compaction
      * @throws IOException problem writing to FS
      */
-    public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
-    throws IOException {
+    public void appendMetadata(
+      final long minFlushTime, final long maxSequenceId, final boolean majorCompaction)
+      throws IOException {
       writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
-      writer.appendFileInfo(MAJOR_COMPACTION_KEY,
-          Bytes.toBytes(majorCompaction));
+      writer.appendFileInfo(
+        MAJOR_COMPACTION_KEY,
+        Bytes.toBytes(majorCompaction)
+      );
+      writer.appendFileInfo(MIN_FLUSH_TIME, Bytes.toBytes(minFlushTime));
       appendTimeRangeMetadata();
     }
 
+    // Former version that does not include minFlushTime
+    @Deprecated
+    public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
+    throws IOException {
+        appendMetadata(HConstants.NO_MIN_FLUSH_TIME, maxSequenceId, majorCompaction);
+    }
+
     /**
      * Add TimestampRange to Metadata
      */

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/TierCompactionConfiguration.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/TierCompactionConfiguration.java?rev=1378348&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/TierCompactionConfiguration.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/TierCompactionConfiguration.java Tue Aug 28 21:13:38 2012
@@ -0,0 +1,272 @@
+/**
+ * Copyright 2012 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+
+import java.text.DecimalFormat;
+
+/**
+ * Control knobs for default compaction algorithm
+ */
+public class TierCompactionConfiguration extends CompactionConfiguration {
+
+  private CompactionTier[] compactionTier;
+  private boolean recentFirstOrder;
+
+  TierCompactionConfiguration(Configuration conf, Store store) {
+    super(conf, store);
+
+    String strPrefix = "hbase.hstore.compaction.";
+    String strSchema = "tbl." + store.getHRegion().getTableDesc().getNameAsString()
+                       + "cf." + store.getFamily().getNameAsString() + ".";
+    String strDefault = "Default.";
+    String strAttribute;
+    // If value not set for family, use default family (by passing null).
+    // If default value not set, use 1 tier.
+
+    strAttribute = "NumCompactionTiers";
+    compactionTier = new CompactionTier[
+      conf.getInt(strPrefix + strSchema  + strAttribute,
+      conf.getInt(strPrefix + strDefault + strAttribute,
+      1))];
+
+    strAttribute = "IsRecentFirstOrder";
+    recentFirstOrder =
+      conf.getBoolean(strPrefix + strSchema  + strAttribute,
+      conf.getBoolean(strPrefix + strDefault + strAttribute,
+      true));
+
+    strAttribute = "MinCompactSize";
+    minCompactSize =
+      conf.getLong(strPrefix + strSchema  + strAttribute,
+      conf.getLong(strPrefix + strDefault + strAttribute,
+      0));
+
+    strAttribute = "MaxCompactSize";
+    maxCompactSize =
+      conf.getLong(strPrefix + strSchema  + strAttribute,
+      conf.getLong(strPrefix + strDefault + strAttribute,
+      Long.MAX_VALUE));
+
+    strAttribute = "ShouldExcludeBulk";
+    shouldExcludeBulk =
+      conf.getBoolean(strPrefix + strSchema  + strAttribute,
+      conf.getBoolean(strPrefix + strDefault + strAttribute,
+      shouldExcludeBulk));
+
+    strAttribute = "ShouldDeleteExpired";
+    shouldDeleteExpired =
+      conf.getBoolean(strPrefix + strSchema  + strAttribute,
+      conf.getBoolean(strPrefix + strDefault + strAttribute,
+      shouldDeleteExpired));
+
+    strAttribute = "ThrottlePoint";
+    throttlePoint =
+      conf.getLong(strPrefix + strSchema  + strAttribute,
+      conf.getLong(strPrefix + strDefault + strAttribute,
+      throttlePoint));
+
+    strAttribute = "MajorCompactionPeriod";
+    majorCompactionPeriod =
+      conf.getLong(strPrefix + strSchema  + strAttribute,
+      conf.getLong(strPrefix + strDefault + strAttribute,
+      majorCompactionPeriod));
+
+    strAttribute = "MajorCompactionJitter";
+    majorCompactionJitter =
+      conf.getFloat(
+          strPrefix + strSchema + strAttribute,
+          conf.getFloat(
+              strPrefix + strDefault + strAttribute,
+              majorCompactionJitter
+          )
+      );
+
+    for (int i = 0; i < compactionTier.length; i++) {
+      compactionTier[i] = new CompactionTier(i);
+    }
+  }
+  /**
+   * @return Number of compaction Tiers
+   */
+  int getNumCompactionTiers() {
+    return compactionTier.length;
+  }
+
+  /**
+   * @return The i-th tier from most recent
+   */
+  CompactionTier getCompactionTier(int i) {
+    return compactionTier[i];
+  }
+
+  /**
+   * @return Whether the tiers will be checked for compaction from newest to oldest
+   */
+  boolean isRecentFirstOrder() {
+    return recentFirstOrder;
+  }
+
+  /**
+   * Parameters for each tier
+   */
+  class CompactionTier {
+
+    private long maxAgeInDisk;
+    private long maxSize;
+    private double tierCompactionRatio;
+    private int tierMinFilesToCompact;
+    private int tierMaxFilesToCompact;
+    private int endingIndexForTier;
+
+    CompactionTier(int tier) {
+      String strPrefix = "hbase.hstore.compaction.";
+      String strSchema = "tbl." + store.getHRegion().getTableDesc().getNameAsString()
+                         + "cf." + store.getFamily().getNameAsString() + ".";
+      String strDefault = "Default.";
+      String strDefTier = "";
+      String strTier = "Tier." + String.valueOf(tier) + ".";
+      String strAttribute;
+
+      /**
+       * Use value set for current family, current tier
+       * If not set, use value set for current family, default tier
+       * if not set, use value set for Default family, current tier
+       * If not set, use value set for Default family, default tier
+       * Else just use a default value
+       */
+
+      strAttribute = "MaxAgeInDisk";
+      maxAgeInDisk =
+        conf.getLong(strPrefix + strSchema  + strTier + strAttribute,
+        conf.getLong(strPrefix + strDefault + strTier + strAttribute,
+        Long.MAX_VALUE));
+
+      strAttribute = "MaxSize";
+      maxSize =
+        conf.getLong(strPrefix + strSchema  + strTier + strAttribute,
+        conf.getLong(strPrefix + strDefault + strTier + strAttribute,
+        Long.MAX_VALUE));
+
+      strAttribute = "CompactionRatio";
+      tierCompactionRatio = (double)
+        conf.getFloat(strPrefix + strSchema  + strTier  + strAttribute,
+        conf.getFloat(strPrefix + strSchema  + strDefTier + strAttribute,
+        conf.getFloat(strPrefix + strDefault + strTier  + strAttribute,
+        conf.getFloat(strPrefix + strDefault + strDefTier + strAttribute,
+        (float) compactionRatio))));
+
+      strAttribute = "MinFilesToCompact";
+      tierMinFilesToCompact =
+        conf.getInt(strPrefix + strSchema  + strTier  + strAttribute,
+        conf.getInt(strPrefix + strSchema  + strDefTier + strAttribute,
+        conf.getInt(strPrefix + strDefault + strTier  + strAttribute,
+        conf.getInt(strPrefix + strDefault + strDefTier + strAttribute,
+        minFilesToCompact))));
+
+      strAttribute = "MaxFilesToCompact";
+      tierMaxFilesToCompact =
+        conf.getInt(strPrefix + strSchema  + strTier  + strAttribute,
+        conf.getInt(strPrefix + strSchema  + strDefTier + strAttribute,
+        conf.getInt(strPrefix + strDefault + strTier  + strAttribute,
+        conf.getInt(strPrefix + strDefault + strDefTier + strAttribute,
+        maxFilesToCompact))));
+
+      strAttribute = "EndingIndexForTier";
+      endingIndexForTier =
+        conf.getInt(strPrefix + strSchema  + strTier + strAttribute,
+        conf.getInt(strPrefix + strDefault + strTier + strAttribute,
+        tier));
+
+      //make sure this value is not incorrectly set
+      if (endingIndexForTier < 0 || endingIndexForTier > tier) {
+        LOG.error("EndingIndexForTier improperly set. Using default value.");
+        endingIndexForTier = tier;
+      }
+
+    }
+
+    /**
+     * @return Upper bound on storeFile's minFlushTime to be included in this tier
+     */
+    long getMaxAgeInDisk() {
+      return maxAgeInDisk;
+    }
+
+    /**
+     * @return Upper bound on storeFile's size to be included in this tier
+     */
+    long getMaxSize() {
+      return maxSize;
+    }
+
+    /**
+     * @return Compaction ratio for selections of this tier
+     */
+    double getCompactionRatio() {
+      return tierCompactionRatio;
+    }
+
+    /**
+     * @return lower bound on number of files in selections of this tier
+     */
+    int getMinFilesToCompact() {
+      return tierMinFilesToCompact;
+    }
+
+    /**
+     * @return upper bound on number of files in selections of this tier
+     */
+    int getMaxFilesToCompact() {
+      return tierMaxFilesToCompact;
+    }
+
+    /**
+     * @return the newest tier which will also be included in selections of this tier
+     *  by default it is the index of this tier, must be between 0 and this tier
+     */
+    int getEndingIndexForTier() {
+      return endingIndexForTier;
+    }
+
+    String getDescription() {
+      String ageString = "INF";
+      String sizeString = "INF";
+      if (getMaxAgeInDisk() < Long.MAX_VALUE) {
+        ageString = StringUtils.formatTime(getMaxAgeInDisk());
+      }
+      if (getMaxSize() < Long.MAX_VALUE) {
+        ageString = StringUtils.humanReadableInt(getMaxSize());
+      }
+      String ret = "Has files upto age " + ageString
+          + " and upto size " + sizeString + ". "
+          + "Compaction ratio: " + (new DecimalFormat("#.##")).format(getCompactionRatio()) + ", "
+          + "Compaction Selection with at least " + getMinFilesToCompact() + " and "
+          + "at most " + getMaxFilesToCompact() + " files possible, "
+          + "Selections in this tier includes files up to tier " + getEndingIndexForTier();
+      return ret;
+    }
+
+  }
+
+}
\ No newline at end of file



Mime
View raw message