hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject [2/3] hbase git commit: HBASE-10201 Port 'Make flush decisions per column family' to trunk
Date Fri, 19 Dec 2014 00:06:20 GMT
HBASE-10201 Port 'Make flush decisions per column family' to trunk

Signed-off-by: stack <stack@apache.org>

Conflicts:
	hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
	hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e55ef7a6
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e55ef7a6
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e55ef7a6

Branch: refs/heads/branch-1
Commit: e55ef7a663dd9a18fa88a506afd8fe0ced10563d
Parents: 9895604
Author: zhangduo <zhangduo@wandoujia.com>
Authored: Sat Dec 13 12:49:38 2014 +0800
Committer: stack <stack@apache.org>
Committed: Thu Dec 18 15:58:55 2014 -0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/HTableDescriptor.java   |  24 +
 .../src/main/resources/hbase-default.xml        |  19 +-
 .../regionserver/FlushAllStoresPolicy.java      |  35 +
 .../regionserver/FlushLargeStoresPolicy.java    | 108 ++++
 .../hadoop/hbase/regionserver/FlushPolicy.java  |  49 ++
 .../hbase/regionserver/FlushPolicyFactory.java  |  76 +++
 .../hbase/regionserver/FlushRequester.java      |  15 +-
 .../hadoop/hbase/regionserver/HRegion.java      | 314 ++++++---
 .../hbase/regionserver/HRegionServer.java       |   4 +-
 .../hadoop/hbase/regionserver/LogRoller.java    |   3 +-
 .../hbase/regionserver/MemStoreFlusher.java     |  81 ++-
 .../hbase/regionserver/RSRpcServices.java       |  12 +-
 .../hadoop/hbase/regionserver/wal/FSHLog.java   | 242 +++++--
 .../hbase/regionserver/wal/FSWALEntry.java      |  29 +-
 .../hadoop/hbase/wal/DisabledWALProvider.java   |   8 +-
 .../java/org/apache/hadoop/hbase/wal/WAL.java   |  11 +-
 .../org/apache/hadoop/hbase/TestIOFencing.java  |   4 +-
 .../regionserver/TestFlushRegionEntry.java      |   4 +-
 .../regionserver/TestHeapMemoryManager.java     |  16 +-
 .../regionserver/TestPerColumnFamilyFlush.java  | 644 +++++++++++++++++++
 .../hbase/regionserver/wal/TestFSHLog.java      |  42 +-
 .../hbase/regionserver/wal/TestWALReplay.java   |  19 +-
 .../hbase/wal/TestDefaultWALProvider.java       |  73 ++-
 .../apache/hadoop/hbase/wal/TestWALFactory.java |  36 +-
 24 files changed, 1549 insertions(+), 319 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
index d16e8ba..ed0cec2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
@@ -144,6 +144,8 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
   private static final ImmutableBytesWritable MEMSTORE_FLUSHSIZE_KEY =
     new ImmutableBytesWritable(Bytes.toBytes(MEMSTORE_FLUSHSIZE));
 
+  public static final String FLUSH_POLICY = "FLUSH_POLICY";
+
   /**
    * <em>INTERNAL</em> Used by rest interface to access this metadata
    * attribute which denotes if the table is a -ROOT- region or not
@@ -779,6 +781,28 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
   }
 
   /**
+   * This sets the class associated with the flush policy which determines determines the stores
+   * need to be flushed when flushing a region. The class used by default is defined in
+   * {@link org.apache.hadoop.hbase.regionserver.FlushPolicy}
+   * @param clazz the class name
+   */
+  public HTableDescriptor setFlushPolicyClassName(String clazz) {
+    setValue(FLUSH_POLICY, clazz);
+    return this;
+  }
+
+  /**
+   * This gets the class associated with the flush policy which determines the stores need to be
+   * flushed when flushing a region. The class used by default is defined in
+   * {@link org.apache.hadoop.hbase.regionserver.FlushPolicy}
+   * @return the class name of the flush policy for this table. If this returns null, the default
+   *         flush policy is used.
+   */
+  public String getFlushPolicyClassName() {
+    return getValue(FLUSH_POLICY);
+  }
+
+  /**
    * Adds a column family.
    * For the updating purpose please use {@link #modifyFamily(HColumnDescriptor)} instead.
    * @param family HColumnDescriptor of family to add.

http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 9730560..9f0c3fe 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -187,7 +187,7 @@ possible configurations would overwhelm and obscure the important.
       A value of 0 means a single queue shared between all the handlers.
       A value of 1 means that each handler has its own queue.</description>
   </property>
-<property>
+  <property>
     <name>hbase.ipc.server.callqueue.read.ratio</name>
     <value>0</value>
     <description>Split the call queues into read and write queues.
@@ -337,8 +337,8 @@ possible configurations would overwhelm and obscure the important.
     <value>org.apache.hadoop.hbase.regionserver.IncreasingToUpperBoundRegionSplitPolicy</value>
     <description>
       A split policy determines when a region should be split. The various other split policies that
-      are available currently are ConstantSizeRegionSplitPolicy, DisabledRegionSplitPolicy, 
-      DelimitedKeyPrefixRegionSplitPolicy, KeyPrefixRegionSplitPolicy etc.  
+      are available currently are ConstantSizeRegionSplitPolicy, DisabledRegionSplitPolicy,
+      DelimitedKeyPrefixRegionSplitPolicy, KeyPrefixRegionSplitPolicy etc.
     </description>
   </property>
 
@@ -596,6 +596,19 @@ possible configurations would overwhelm and obscure the important.
     every hbase.server.thread.wakefrequency.</description>
   </property>
   <property>
+    <name>hbase.hregion.percolumnfamilyflush.size.lower.bound</name>
+    <value>16777216</value>
+    <description>
+    If FlushLargeStoresPolicy is used, then every time that we hit the
+    total memstore limit, we find out all the column families whose memstores
+    exceed this value, and only flush them, while retaining the others whose
+    memstores are lower than this limit. If none of the families have their
+    memstore size more than this, all the memstores will be flushed
+    (just as usual). This value should be less than half of the total memstore
+    threshold (hbase.hregion.memstore.flush.size).
+    </description>
+  </property>
+  <property>
     <name>hbase.hregion.preclose.flush.size</name>
     <value>5242880</value>
     <description>

http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java
new file mode 100644
index 0000000..0058104
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Collection;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A {@link FlushPolicy} that always flushes all stores for a given region.
+ */
+@InterfaceAudience.Private
+public class FlushAllStoresPolicy extends FlushPolicy {
+
+  @Override
+  public Collection<Store> selectStoresToFlush() {
+    return region.stores.values();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
new file mode 100644
index 0000000..7e0e54c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A {@link FlushPolicy} that only flushes store larger a given threshold. If no store is large
+ * enough, then all stores will be flushed.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class FlushLargeStoresPolicy extends FlushPolicy {
+
+  private static final Log LOG = LogFactory.getLog(FlushLargeStoresPolicy.class);
+
+  public static final String HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND =
+      "hbase.hregion.percolumnfamilyflush.size.lower.bound";
+
+  private static final long DEFAULT_HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND = 1024 * 1024 * 16L;
+
+  private long flushSizeLowerBound;
+
+  @Override
+  protected void configureForRegion(HRegion region) {
+    super.configureForRegion(region);
+    long flushSizeLowerBound;
+    String flushedSizeLowerBoundString =
+        region.getTableDesc().getValue(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND);
+    if (flushedSizeLowerBoundString == null) {
+      flushSizeLowerBound =
+          getConf().getLong(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND,
+            DEFAULT_HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND
+            + " is not specified, use global config(" + flushSizeLowerBound + ") instead");
+      }
+    } else {
+      try {
+        flushSizeLowerBound = Long.parseLong(flushedSizeLowerBoundString);
+      } catch (NumberFormatException nfe) {
+        flushSizeLowerBound =
+            getConf().getLong(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND,
+              DEFAULT_HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND);
+        LOG.warn("Number format exception when parsing "
+            + HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND + " for table "
+            + region.getTableDesc().getTableName() + ":" + flushedSizeLowerBoundString + ". " + nfe
+            + ", use global config(" + flushSizeLowerBound + ") instead");
+
+      }
+    }
+    this.flushSizeLowerBound = flushSizeLowerBound;
+  }
+
+  private boolean shouldFlush(Store store) {
+    if (store.getMemStoreSize() > this.flushSizeLowerBound) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + region
+            + " will be flushed because of memstoreSize(" + store.getMemStoreSize()
+            + ") is larger than lower bound(" + this.flushSizeLowerBound + ")");
+      }
+      return true;
+    }
+    return region.shouldFlushStore(store);
+  }
+
+  @Override
+  public Collection<Store> selectStoresToFlush() {
+    Collection<Store> stores = region.stores.values();
+    Set<Store> specificStoresToFlush = new HashSet<Store>();
+    for (Store store : stores) {
+      if (shouldFlush(store)) {
+        specificStoresToFlush.add(store);
+      }
+    }
+    // Didn't find any CFs which were above the threshold for selection.
+    if (specificStoresToFlush.isEmpty()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Since none of the CFs were above the size, flushing all.");
+      }
+      return stores;
+    } else {
+      return specificStoresToFlush;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java
new file mode 100644
index 0000000..d581fee
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A flush policy determines the stores that need to be flushed when flushing a region.
+ */
+@InterfaceAudience.Private
+public abstract class FlushPolicy extends Configured {
+
+  /**
+   * The region configured for this flush policy.
+   */
+  protected HRegion region;
+
+  /**
+   * Upon construction, this method will be called with the region to be governed. It will be called
+   * once and only once.
+   */
+  protected void configureForRegion(HRegion region) {
+    this.region = region;
+  }
+
+  /**
+   * @return the stores need to be flushed.
+   */
+  public abstract Collection<Store> selectStoresToFlush();
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicyFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicyFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicyFactory.java
new file mode 100644
index 0000000..e80b696
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicyFactory.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * The class that creates a flush policy from a conf and HTableDescriptor.
+ * <p>
+ * The default flush policy is {@link FlushLargeStoresPolicy}. And for 0.98, the default flush
+ * policy is {@link FlushAllStoresPolicy}.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class FlushPolicyFactory {
+
+  private static final Log LOG = LogFactory.getLog(FlushPolicyFactory.class);
+
+  public static final String HBASE_FLUSH_POLICY_KEY = "hbase.regionserver.flush.policy";
+
+  private static final Class<? extends FlushPolicy> DEFAULT_FLUSH_POLICY_CLASS =
+      FlushLargeStoresPolicy.class;
+
+  /**
+   * Create the FlushPolicy configured for the given table.
+   */
+  public static FlushPolicy create(HRegion region, Configuration conf) throws IOException {
+    Class<? extends FlushPolicy> clazz = getFlushPolicyClass(region.getTableDesc(), conf);
+    FlushPolicy policy = ReflectionUtils.newInstance(clazz, conf);
+    policy.configureForRegion(region);
+    return policy;
+  }
+
+  /**
+   * Get FlushPolicy class for the given table.
+   */
+  public static Class<? extends FlushPolicy> getFlushPolicyClass(HTableDescriptor htd,
+      Configuration conf) throws IOException {
+    String className = htd.getFlushPolicyClassName();
+    if (className == null) {
+      className = conf.get(HBASE_FLUSH_POLICY_KEY, DEFAULT_FLUSH_POLICY_CLASS.getName());
+    }
+    try {
+      Class<? extends FlushPolicy> clazz = Class.forName(className).asSubclass(FlushPolicy.class);
+      return clazz;
+    } catch (Exception e) {
+      LOG.warn(
+        "Unable to load configured flush policy '" + className + "' for table '"
+            + htd.getTableName() + "', load default flush policy "
+            + DEFAULT_FLUSH_POLICY_CLASS.getName() + " instead", e);
+      return DEFAULT_FLUSH_POLICY_CLASS;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
index e1c3144..7517454 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java
@@ -30,26 +30,31 @@ public interface FlushRequester {
    * Tell the listener the cache needs to be flushed.
    *
    * @param region the HRegion requesting the cache flush
+   * @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log
+   *          rolling.
    */
-  void requestFlush(HRegion region);
+  void requestFlush(HRegion region, boolean forceFlushAllStores);
+
   /**
    * Tell the listener the cache needs to be flushed after a delay
    *
    * @param region the HRegion requesting the cache flush
    * @param delay after how much time should the flush happen
+   * @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log
+   *          rolling.
    */
-  void requestDelayedFlush(HRegion region, long delay);
+  void requestDelayedFlush(HRegion region, long delay, boolean forceFlushAllStores);
 
   /**
    * Register a FlushRequestListener
-   * 
+   *
    * @param listener
    */
   void registerFlushRequestListener(final FlushRequestListener listener);
 
   /**
    * Unregister the given FlushRequestListener
-   * 
+   *
    * @param listener
    * @return true when passed listener is unregistered successfully.
    */
@@ -57,7 +62,7 @@ public interface FlushRequester {
 
   /**
    * Sets the global memstore limit to a new size.
-   * 
+   *
    * @param globalMemStoreSize
    */
   public void setGlobalMemstoreLimit(long globalMemStoreSize);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 33aa8de..7ada09a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -30,6 +30,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -41,6 +42,7 @@ import java.util.TreeMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -62,7 +64,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -89,6 +90,7 @@ import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.backup.HFileArchiver;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
@@ -132,14 +134,9 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.Write
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
-import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
-import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.wal.WALSplitter;
-import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
-import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -155,6 +152,11 @@ import org.apache.hadoop.hbase.util.HashedBytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
 import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.util.StringUtils;
 
@@ -228,10 +230,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
   final AtomicBoolean closing = new AtomicBoolean(false);
 
   /**
-   * The sequence id of the last flush on this region.  Used doing some rough calculations on
+   * The max sequence id of flushed data on this region.  Used doing some rough calculations on
    * whether time to flush or not.
    */
-  protected volatile long lastFlushSeqId = -1L;
+  protected volatile long maxFlushedSeqId = -1L;
 
   /**
    * Region scoped edit sequence Id. Edits to this region are GUARANTEED to appear in the WAL
@@ -516,7 +518,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
   long memstoreFlushSize;
   final long timestampSlop;
   final long rowProcessorTimeout;
-  private volatile long lastFlushTime;
+
+  // Last flush time for each Store. Useful when we are flushing for each column
+  private final ConcurrentMap<Store, Long> lastStoreFlushTimeMap =
+      new ConcurrentHashMap<Store, Long>();
+
   final RegionServerServices rsServices;
   private RegionServerAccounting rsAccounting;
   private long flushCheckInterval;
@@ -542,6 +548,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
 
   private HTableDescriptor htableDescriptor = null;
   private RegionSplitPolicy splitPolicy;
+  private FlushPolicy flushPolicy;
 
   private final MetricsRegion metricsRegion;
   private final MetricsRegionWrapperImpl metricsRegionWrapper;
@@ -618,7 +625,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can not exceed "
           + MAX_FLUSH_PER_CHANGES);
     }
-
     this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
                     DEFAULT_ROWLOCK_WAIT_DURATION);
 
@@ -777,8 +783,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     // Initialize split policy
     this.splitPolicy = RegionSplitPolicy.create(this, conf);
 
-    this.lastFlushTime = EnvironmentEdgeManager.currentTime();
-    // Use maximum of wal sequenceid or that which was found in stores
+    // Initialize flush policy
+    this.flushPolicy = FlushPolicyFactory.create(this, conf);
+
+    long lastFlushTime = EnvironmentEdgeManager.currentTime();
+    for (Store store: stores.values()) {
+      this.lastStoreFlushTimeMap.put(store, lastFlushTime);
+    }
+
+    // Use maximum of log sequenceid or that which was found in stores
     // (particularly if no recovered edits, seqid will be -1).
     long nextSeqid = maxSeqId;
 
@@ -1316,10 +1329,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
         status.setStatus("Running coprocessor post-close hooks");
         this.coprocessorHost.postClose(abort);
       }
-      if ( this.metricsRegion != null) {
+      if (this.metricsRegion != null) {
         this.metricsRegion.close();
       }
-      if ( this.metricsRegionWrapper != null) {
+      if (this.metricsRegionWrapper != null) {
         Closeables.closeQuietly(this.metricsRegionWrapper);
       }
       status.markComplete("Closed");
@@ -1458,9 +1471,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     return this.fs;
   }
 
-  /** @return the last time the region was flushed */
-  public long getLastFlushTime() {
-    return this.lastFlushTime;
+  /**
+   * @return Returns the earliest time a store in the region was flushed. All
+   *         other stores in the region would have been flushed either at, or
+   *         after this time.
+   */
+  @VisibleForTesting
+  public long getEarliestFlushTimeForAllStores() {
+    return Collections.min(lastStoreFlushTimeMap.values());
   }
 
   //////////////////////////////////////////////////////////////////////////////
@@ -1626,6 +1644,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
   }
 
   /**
+   * Flush all stores.
+   * <p>
+   * See {@link #flushcache(boolean)}.
+   *
+   * @return whether the flush is success and whether the region needs compacting
+   * @throws IOException
+   */
+  public FlushResult flushcache() throws IOException {
+    return flushcache(true);
+  }
+
+  /**
    * Flush the cache.
    *
    * When this method is called the cache will be flushed unless:
@@ -1638,14 +1668,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
    *
    * <p>This method may block for some time, so it should not be called from a
    * time-sensitive thread.
-   *
-   * @return true if the region needs compacting
+   * @param forceFlushAllStores whether we want to flush all stores
+   * @return whether the flush is success and whether the region needs compacting
    *
    * @throws IOException general io exceptions
    * @throws DroppedSnapshotException Thrown when replay of wal is required
    * because a Snapshot was not properly persisted.
    */
-  public FlushResult flushcache() throws IOException {
+  public FlushResult flushcache(boolean forceFlushAllStores) throws IOException {
     // fail-fast instead of waiting on the lock
     if (this.closing.get()) {
       String msg = "Skipping flush on " + this + " because closing";
@@ -1687,8 +1717,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
           return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
         }
       }
+
       try {
-        FlushResult fs = internalFlushcache(status);
+        Collection<Store> specificStoresToFlush =
+            forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush();
+        FlushResult fs = internalFlushcache(specificStoresToFlush, status);
 
         if (coprocessorHost != null) {
           status.setStatus("Running post-flush coprocessor hooks");
@@ -1711,12 +1744,47 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
   }
 
   /**
+   * Should the store be flushed because it is old enough.
+   * <p>
+   * Every FlushPolicy should call this to determine whether a store is old enough to flush(except
+   * that you always flush all stores). Otherwise the {@link #shouldFlush()} method will always
+   * returns true which will make a lot of flush requests.
+   */
+  boolean shouldFlushStore(Store store) {
+    long maxFlushedSeqId =
+        this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), store
+            .getFamily().getName()) - 1;
+    if (maxFlushedSeqId > 0 && maxFlushedSeqId + flushPerChanges < sequenceId.get()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + this
+            + " will be flushed because its max flushed seqId(" + maxFlushedSeqId
+            + ") is far away from current(" + sequenceId.get() + "), max allowed is "
+            + flushPerChanges);
+      }
+      return true;
+    }
+    if (flushCheckInterval <= 0) {
+      return false;
+    }
+    long now = EnvironmentEdgeManager.currentTime();
+    if (store.timeOfOldestEdit() < now - flushCheckInterval) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + this
+            + " will be flushed because time of its oldest edit (" + store.timeOfOldestEdit()
+            + ") is far away from now(" + now + "), max allowed is " + flushCheckInterval);
+      }
+      return true;
+    }
+    return false;
+  }
+
+  /**
    * Should the memstore be flushed now
    */
   boolean shouldFlush() {
     // This is a rough measure.
-    if (this.lastFlushSeqId > 0
-          && (this.lastFlushSeqId + this.flushPerChanges < this.sequenceId.get())) {
+    if (this.maxFlushedSeqId > 0
+          && (this.maxFlushedSeqId + this.flushPerChanges < this.sequenceId.get())) {
       return true;
     }
     if (flushCheckInterval <= 0) { //disabled
@@ -1724,7 +1792,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     }
     long now = EnvironmentEdgeManager.currentTime();
     //if we flushed in the recent past, we don't need to do again now
-    if ((now - getLastFlushTime() < flushCheckInterval)) {
+    if ((now - getEarliestFlushTimeForAllStores() < flushCheckInterval)) {
       return false;
     }
     //since we didn't flush in the recent past, flush now if certain conditions
@@ -1739,35 +1807,56 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
   }
 
   /**
-   * Flush the memstore. Flushing the memstore is a little tricky. We have a lot of updates in the
-   * memstore, all of which have also been written to the wal. We need to write those updates in the
-   * memstore out to disk, while being able to process reads/writes as much as possible during the
-   * flush operation.
-   * <p>This method may block for some time.  Every time you call it, we up the regions
-   * sequence id even if we don't flush; i.e. the returned region id will be at least one larger
-   * than the last edit applied to this region. The returned id does not refer to an actual edit.
-   * The returned id can be used for say installing a bulk loaded file just ahead of the last hfile
-   * that was the result of this flush, etc.
-   * @return object describing the flush's state
+   * Flushing all stores.
    *
-   * @throws IOException general io exceptions
-   * @throws DroppedSnapshotException Thrown when replay of wal is required
-   * because a Snapshot was not properly persisted.
+   * @see #internalFlushcache(Collection, MonitoredTask)
    */
-  protected FlushResult internalFlushcache(MonitoredTask status)
+  private FlushResult internalFlushcache(MonitoredTask status)
       throws IOException {
-    return internalFlushcache(this.wal, -1, status);
+    return internalFlushcache(stores.values(), status);
+  }
+
+  /**
+   * Flushing given stores.
+   *
+   * @see #internalFlushcache(WAL, long, Collection, MonitoredTask)
+   */
+  private FlushResult internalFlushcache(final Collection<Store> storesToFlush,
+      MonitoredTask status) throws IOException {
+    return internalFlushcache(this.wal, HConstants.NO_SEQNUM, storesToFlush,
+        status);
   }
 
   /**
-   * @param wal Null if we're NOT to go via wal.
-   * @param myseqid The seqid to use if <code>wal</code> is null writing out flush file.
+   * Flush the memstore. Flushing the memstore is a little tricky. We have a lot
+   * of updates in the memstore, all of which have also been written to the wal.
+   * We need to write those updates in the memstore out to disk, while being
+   * able to process reads/writes as much as possible during the flush
+   * operation.
+   * <p>
+   * This method may block for some time. Every time you call it, we up the
+   * regions sequence id even if we don't flush; i.e. the returned region id
+   * will be at least one larger than the last edit applied to this region. The
+   * returned id does not refer to an actual edit. The returned id can be used
+   * for say installing a bulk loaded file just ahead of the last hfile that was
+   * the result of this flush, etc.
+   *
+   * @param wal
+   *          Null if we're NOT to go via wal.
+   * @param myseqid
+   *          The seqid to use if <code>wal</code> is null writing out flush
+   *          file.
+   * @param storesToFlush
+   *          The list of stores to flush.
    * @return object describing the flush's state
    * @throws IOException
-   * @see #internalFlushcache(MonitoredTask)
+   *           general io exceptions
+   * @throws DroppedSnapshotException
+   *           Thrown when replay of wal is required because a Snapshot was not
+   *           properly persisted.
    */
-  protected FlushResult internalFlushcache(
-      final WAL wal, final long myseqid, MonitoredTask status) throws IOException {
+  protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
+      final Collection<Store> storesToFlush, MonitoredTask status) throws IOException {
     if (this.rsServices != null && this.rsServices.isAborted()) {
       // Don't flush when server aborting, it's unsafe
       throw new IOException("Aborting flush because server is aborted...");
@@ -1809,63 +1898,86 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       }
     }
 
-    LOG.info("Started memstore flush for " + this +
-      ", current region memstore size " +
-      StringUtils.byteDesc(this.memstoreSize.get()) +
-      ((wal != null)? "": "; wal is null, using passed sequenceid=" + myseqid));
-
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Started memstore flush for " + this + ", current region memstore size "
+          + StringUtils.byteDesc(this.memstoreSize.get()) + ", and " + storesToFlush.size() + "/"
+          + stores.size() + " column families' memstores are being flushed."
+          + ((wal != null) ? "" : "; wal is null, using passed sequenceid=" + myseqid));
+      // only log when we are not flushing all stores.
+      if (this.stores.size() > storesToFlush.size()) {
+        for (Store store: storesToFlush) {
+          LOG.info("Flushing Column Family: " + store.getColumnFamilyName()
+              + " which was occupying "
+              + StringUtils.byteDesc(store.getMemStoreSize()) + " of memstore.");
+        }
+      }
+    }
     // Stop updates while we snapshot the memstore of all of these regions' stores. We only have
     // to do this for a moment.  It is quick. We also set the memstore size to zero here before we
     // allow updates again so its value will represent the size of the updates received
     // during flush
     MultiVersionConsistencyControl.WriteEntry w = null;
-
     // We have to take an update lock during snapshot, or else a write could end up in both snapshot
     // and memstore (makes it difficult to do atomic rows then)
     status.setStatus("Obtaining lock to block concurrent updates");
     // block waiting for the lock for internal flush
     this.updatesLock.writeLock().lock();
-    long totalFlushableSize = 0;
     status.setStatus("Preparing to flush by snapshotting stores in " +
       getRegionInfo().getEncodedName());
+    long totalFlushableSizeOfFlushableStores = 0;
+
+    Set<byte[]> flushedFamilyNames = new HashSet<byte[]>();
+    for (Store store: storesToFlush) {
+      flushedFamilyNames.add(store.getFamily().getName());
+    }
+
     List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
     TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>(
         Bytes.BYTES_COMPARATOR);
-    long flushSeqId = -1L;
+    // The sequence id of this flush operation which is used to log FlushMarker and pass to
+    // createFlushContext to use as the store file's sequence id.
+    long flushOpSeqId = HConstants.NO_SEQNUM;
+    // The max flushed sequence id after this flush operation. Used as completeSequenceId which is
+    // passed to HMaster.
+    long flushedSeqId = HConstants.NO_SEQNUM;
+    byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes();
 
     long trxId = 0;
     try {
       try {
         w = mvcc.beginMemstoreInsert();
         if (wal != null) {
-          if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) {
+          if (!wal.startCacheFlush(encodedRegionName, flushedFamilyNames)) {
             // This should never happen.
             String msg = "Flush will not be started for ["
                 + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
             status.setStatus(msg);
             return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
           }
-          // Get a sequence id that we can use to denote the flush. It will be one beyond the last
-          // edit that made it into the hfile (the below does not add an edit, it just asks the
-          // WAL system to return next sequence edit).
-          flushSeqId = getNextSequenceId(wal);
+          flushOpSeqId = getNextSequenceId(wal);
+          long oldestUnflushedSeqId = wal.getEarliestMemstoreSeqNum(encodedRegionName);
+          // no oldestUnflushedSeqId means we flushed all stores.
+          // or the unflushed stores are all empty.
+          flushedSeqId =
+              oldestUnflushedSeqId == HConstants.NO_SEQNUM ? flushOpSeqId : oldestUnflushedSeqId - 1;
         } else {
           // use the provided sequence Id as WAL is not being used for this flush.
-          flushSeqId = myseqid;
+          flushedSeqId = flushOpSeqId = myseqid;
         }
 
-        for (Store s : stores.values()) {
-          totalFlushableSize += s.getFlushableSize();
-          storeFlushCtxs.add(s.createFlushContext(flushSeqId));
+        for (Store s : storesToFlush) {
+          totalFlushableSizeOfFlushableStores += s.getFlushableSize();
+          storeFlushCtxs.add(s.createFlushContext(flushOpSeqId));
           committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL
         }
 
         // write the snapshot start to WAL
         if (wal != null) {
           FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
-            getRegionInfo(), flushSeqId, committedFiles);
+            getRegionInfo(), flushOpSeqId, committedFiles);
+          // no sync. Sync is below where we do not hold the updates lock
           trxId = WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
-            desc, sequenceId, false); // no sync. Sync is below where we do not hold the updates lock
+            desc, sequenceId, false);
         }
 
         // Prepare flush (take a snapshot)
@@ -1877,7 +1989,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
           if (trxId > 0) { // check whether we have already written START_FLUSH to WAL
             try {
               FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
-                getRegionInfo(), flushSeqId, committedFiles);
+                getRegionInfo(), flushOpSeqId, committedFiles);
               WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
                 desc, sequenceId, false);
             } catch (Throwable t) {
@@ -1894,7 +2006,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
         this.updatesLock.writeLock().unlock();
       }
       String s = "Finished memstore snapshotting " + this +
-        ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize;
+        ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSizeOfFlushableStores;
       status.setStatus(s);
       if (LOG.isTraceEnabled()) LOG.trace(s);
       // sync unflushed WAL changes
@@ -1913,7 +2025,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       // uncommitted transactions from being written into HFiles.
       // We have to block before we start the flush, otherwise keys that
       // were removed via a rollbackMemstore could be written to Hfiles.
-      w.setWriteNumber(flushSeqId);
+      w.setWriteNumber(flushOpSeqId);
       mvcc.waitForPreviousTransactionsComplete(w);
       // set w to null to prevent mvcc.advanceMemstore from being called again inside finally block
       w = null;
@@ -1944,8 +2056,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
 
       // Switch snapshot (in memstore) -> new hfile (thus causing
       // all the store scanners to reset/reseek).
-      Iterator<Store> it = stores.values().iterator(); // stores.values() and storeFlushCtxs have
-      // same order
+      Iterator<Store> it = storesToFlush.iterator();
+      // stores.values() and storeFlushCtxs have same order
       for (StoreFlushContext flush : storeFlushCtxs) {
         boolean needsCompaction = flush.commit(status);
         if (needsCompaction) {
@@ -1956,12 +2068,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       storeFlushCtxs.clear();
 
       // Set down the memstore size by amount of flush.
-      this.addAndGetGlobalMemstoreSize(-totalFlushableSize);
+      this.addAndGetGlobalMemstoreSize(-totalFlushableSizeOfFlushableStores);
 
       if (wal != null) {
         // write flush marker to WAL. If fail, we should throw DroppedSnapshotException
         FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
-          getRegionInfo(), flushSeqId, committedFiles);
+          getRegionInfo(), flushOpSeqId, committedFiles);
         WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
           desc, sequenceId, true);
       }
@@ -1975,7 +2087,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       if (wal != null) {
         try {
           FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
-            getRegionInfo(), flushSeqId, committedFiles);
+            getRegionInfo(), flushOpSeqId, committedFiles);
           WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
             desc, sequenceId, false);
         } catch (Throwable ex) {
@@ -1998,10 +2110,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     }
 
     // Record latest flush time
-    this.lastFlushTime = EnvironmentEdgeManager.currentTime();
+    for (Store store: storesToFlush) {
+      this.lastStoreFlushTimeMap.put(store, startTime);
+    }
 
-    // Update the last flushed sequence id for region. TODO: This is dup'd inside the WAL/FSHlog.
-    this.lastFlushSeqId = flushSeqId;
+    // Update the oldest unflushed sequence id for region.
+    this.maxFlushedSeqId = flushedSeqId;
 
     // C. Finally notify anyone waiting on memstore to clear:
     // e.g. checkResources().
@@ -2011,18 +2125,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
 
     long time = EnvironmentEdgeManager.currentTime() - startTime;
     long memstoresize = this.memstoreSize.get();
-    String msg = "Finished memstore flush of ~" +
-      StringUtils.byteDesc(totalFlushableSize) + "/" + totalFlushableSize +
-      ", currentsize=" +
-      StringUtils.byteDesc(memstoresize) + "/" + memstoresize +
-      " for region " + this + " in " + time + "ms, sequenceid=" + flushSeqId +
-      ", compaction requested=" + compactionRequested +
-      ((wal == null)? "; wal=null": "");
+    String msg = "Finished memstore flush of ~"
+        + StringUtils.byteDesc(totalFlushableSizeOfFlushableStores) + "/"
+        + totalFlushableSizeOfFlushableStores + ", currentsize="
+        + StringUtils.byteDesc(memstoresize) + "/" + memstoresize
+        + " for region " + this + " in " + time + "ms, sequenceid="
+        + flushOpSeqId +  ", compaction requested=" + compactionRequested
+        + ((wal == null) ? "; wal=null" : "");
     LOG.info(msg);
     status.setStatus(msg);
 
     return new FlushResult(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
-        FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushSeqId);
+        FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushOpSeqId);
   }
 
   /**
@@ -2153,7 +2267,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     if(delete.getFamilyCellMap().isEmpty()){
       for(byte [] family : this.htableDescriptor.getFamiliesKeys()){
         // Don't eat the timestamp
-        delete.deleteFamily(family, delete.getTimeStamp());
+        delete.addFamily(family, delete.getTimeStamp());
       }
     } else {
       for(byte [] family : delete.getFamilyCellMap().keySet()) {
@@ -2804,6 +2918,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
         coprocessorHost.postBatchMutate(miniBatchOp);
       }
 
+
       // ------------------------------------------------------------------
       // STEP 8. Advance mvcc. This will make this put visible to scanners and getters.
       // ------------------------------------------------------------------
@@ -2835,7 +2950,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       success = true;
       return addedSize;
     } finally {
-
       // if the wal sync was unsuccessful, remove keys from memstore
       if (doRollBackMemstore) {
         rollbackMemstore(memstoreCells);
@@ -3194,8 +3308,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
    * We throw RegionTooBusyException if above memstore limit
    * and expect client to retry using some kind of backoff
   */
-  private void checkResources()
-    throws RegionTooBusyException {
+  private void checkResources() throws RegionTooBusyException {
     // If catalog region, do not impose resource constraints or block updates.
     if (this.getRegionInfo().isMetaRegion()) return;
 
@@ -3391,7 +3504,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       writestate.flushRequested = true;
     }
     // Make request outside of synchronize block; HBASE-818.
-    this.rsServices.getFlushRequester().requestFlush(this);
+    this.rsServices.getFlushRequester().requestFlush(this, false);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Flush requested on " + this);
     }
@@ -3512,7 +3625,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     }
     if (seqid > minSeqIdForTheRegion) {
       // Then we added some edits to memory. Flush and cleanup split edit files.
-      internalFlushcache(null, seqid, status);
+      internalFlushcache(null, seqid, stores.values(), status);
     }
     // Now delete the content of recovered edits.  We're done w/ them.
     for (Path file: files) {
@@ -3666,7 +3779,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
             editsCount++;
           }
           if (flush) {
-            internalFlushcache(null, currentEditSeqId, status);
+            internalFlushcache(null, currentEditSeqId, stores.values(), status);
           }
 
           if (coprocessorHost != null) {
@@ -4014,7 +4127,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
       // guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is
       // a sequence id that we can be sure is beyond the last hfile written).
       if (assignSeqId) {
-        FlushResult fs = this.flushcache();
+        FlushResult fs = this.flushcache(true);
         if (fs.isFlushSucceeded()) {
           seqId = fs.flushSequenceId;
         } else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
@@ -5057,8 +5170,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
 
     FileSystem fs = a.getRegionFileSystem().getFileSystem();
     // Make sure each region's cache is empty
-    a.flushcache();
-    b.flushcache();
+    a.flushcache(true);
+    b.flushcache(true);
 
     // Compact each region so we only have one store file per family
     a.compactStores(true);
@@ -5172,7 +5285,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
 
     // do after lock
     if (this.metricsRegion != null) {
-      long totalSize = 0l;
+      long totalSize = 0L;
       for (Cell cell : results) {
         totalSize += CellUtil.estimatedSerializedSizeOf(cell);
       }
@@ -5340,7 +5453,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
             // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId
             walKey = this.appendEmptyEdit(this.wal, memstoreCells);
           }
-
           // 9. Release region lock
           if (locked) {
             this.updatesLock.readLock().unlock();
@@ -5468,7 +5580,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     WALEdit walEdits = null;
     List<Cell> allKVs = new ArrayList<Cell>(append.size());
     Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
-
     long size = 0;
     long txid = 0;
 
@@ -5671,7 +5782,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
             // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
             walKey = this.appendEmptyEdit(this.wal, memstoreCells);
           }
-
           size = this.addAndGetGlobalMemstoreSize(size);
           flush = isFlushSize(size);
         } finally {
@@ -5968,8 +6078,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
   public static final long FIXED_OVERHEAD = ClassSize.align(
       ClassSize.OBJECT +
       ClassSize.ARRAY +
-      42 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
-      (12 * Bytes.SIZEOF_LONG) +
+      44 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
+      (11 * Bytes.SIZEOF_LONG) +
       4 * Bytes.SIZEOF_BOOLEAN);
 
   // woefully out of date - currently missing:
@@ -6539,6 +6649,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     return this.maxSeqIdInStores;
   }
 
+  @VisibleForTesting
+  public long getOldestSeqIdOfStore(byte[] familyName) {
+    return wal.getEarliestMemstoreSeqNum(getRegionInfo()
+        .getEncodedNameAsBytes(), familyName);
+  }
+
   /**
    * @return if a given region is in compaction now.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index ddeacd3..873168c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1379,7 +1379,7 @@ public class HRegionServer extends HasThread implements
       .setWriteRequestsCount(r.writeRequestsCount.get())
       .setTotalCompactingKVs(totalCompactingKVs)
       .setCurrentCompactedKVs(currentCompactedKVs)
-      .setCompleteSequenceId(r.lastFlushSeqId)
+      .setCompleteSequenceId(r.maxFlushedSeqId)
       .setDataLocality(dataLocality);
 
     return regionLoadBldr.build();
@@ -1475,7 +1475,7 @@ public class HRegionServer extends HasThread implements
             //Throttle the flushes by putting a delay. If we don't throttle, and there
             //is a balanced write-load on the regions in a table, we might end up
             //overwhelming the filesystem with too many flushes at once.
-            requester.requestDelayedFlush(r, randomDelay);
+            requester.requestDelayedFlush(r, randomDelay, false);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
index aa5998b..0c5af84 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
@@ -170,7 +170,8 @@ class LogRoller extends HasThread {
     if (r != null) {
       requester = this.services.getFlushRequester();
       if (requester != null) {
-        requester.requestFlush(r);
+        // force flushing all stores to clean old logs
+        requester.requestFlush(r, true);
         scheduled = true;
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index b2820dd..87821ec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -39,17 +39,17 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DroppedSnapshotException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.HasThread;
 import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
 import org.htrace.Trace;
 import org.htrace.TraceScope;
 import org.apache.hadoop.hbase.util.Counter;
@@ -105,20 +105,20 @@ class MemStoreFlusher implements FlushRequester {
     long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
     float globalMemStorePercent = HeapMemorySizeUtil.getGlobalMemStorePercent(conf, true);
     this.globalMemStoreLimit = (long) (max * globalMemStorePercent);
-    this.globalMemStoreLimitLowMarkPercent = 
+    this.globalMemStoreLimitLowMarkPercent =
         HeapMemorySizeUtil.getGlobalMemStoreLowerMark(conf, globalMemStorePercent);
-    this.globalMemStoreLimitLowMark = 
+    this.globalMemStoreLimitLowMark =
         (long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent);
 
     this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime",
       90000);
     int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
     this.flushHandlers = new FlushHandler[handlerCount];
-    LOG.info("globalMemStoreLimit=" +
-      StringUtils.humanReadableInt(this.globalMemStoreLimit) +
-      ", globalMemStoreLimitLowMark=" +
-      StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark) +
-      ", maxHeap=" + StringUtils.humanReadableInt(max));
+    LOG.info("globalMemStoreLimit="
+        + TraditionalBinaryPrefix.long2String(this.globalMemStoreLimit, "", 1)
+        + ", globalMemStoreLimitLowMark="
+        + TraditionalBinaryPrefix.long2String(this.globalMemStoreLimitLowMark, "", 1)
+        + ", maxHeap=" + TraditionalBinaryPrefix.long2String(max, "", 1));
   }
 
   public Counter getUpdatesBlockedMsHighWater() {
@@ -160,13 +160,12 @@ class MemStoreFlusher implements FlushRequester {
         // lots of little flushes and cause lots of compactions, etc, which just makes
         // life worse!
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Under global heap pressure: " +
-            "Region " + bestAnyRegion.getRegionNameAsString() + " has too many " +
-            "store files, but is " +
-            StringUtils.humanReadableInt(bestAnyRegion.memstoreSize.get()) +
-            " vs best flushable region's " +
-            StringUtils.humanReadableInt(bestFlushableRegion.memstoreSize.get()) +
-            ". Choosing the bigger.");
+          LOG.debug("Under global heap pressure: " + "Region "
+              + bestAnyRegion.getRegionNameAsString() + " has too many " + "store files, but is "
+              + TraditionalBinaryPrefix.long2String(bestAnyRegion.memstoreSize.get(), "", 1)
+              + " vs best flushable region's "
+              + TraditionalBinaryPrefix.long2String(bestFlushableRegion.memstoreSize.get(), "", 1)
+              + ". Choosing the bigger.");
         }
         regionToFlush = bestAnyRegion;
       } else {
@@ -180,7 +179,7 @@ class MemStoreFlusher implements FlushRequester {
       Preconditions.checkState(regionToFlush.memstoreSize.get() > 0);
 
       LOG.info("Flush of region " + regionToFlush + " due to global heap pressure");
-      flushedOne = flushRegion(regionToFlush, true);
+      flushedOne = flushRegion(regionToFlush, true, true);
       if (!flushedOne) {
         LOG.info("Excluding unflushable region " + regionToFlush +
           " - trying to find a different region to flush.");
@@ -206,7 +205,7 @@ class MemStoreFlusher implements FlushRequester {
           if (fqe == null || fqe instanceof WakeupFlushThread) {
             if (isAboveLowWaterMark()) {
               LOG.debug("Flush thread woke up because memory above low water="
-                  + StringUtils.humanReadableInt(globalMemStoreLimitLowMark));
+                  + TraditionalBinaryPrefix.long2String(globalMemStoreLimitLowMark, "", 1));
               if (!flushOneForGlobalPressure()) {
                 // Wasn't able to flush any region, but we're above low water mark
                 // This is unlikely to happen, but might happen when closing the
@@ -293,23 +292,23 @@ class MemStoreFlusher implements FlushRequester {
       getGlobalMemstoreSize() >= globalMemStoreLimitLowMark;
   }
 
-  public void requestFlush(HRegion r) {
+  public void requestFlush(HRegion r, boolean forceFlushAllStores) {
     synchronized (regionsInQueue) {
       if (!regionsInQueue.containsKey(r)) {
         // This entry has no delay so it will be added at the top of the flush
         // queue.  It'll come out near immediately.
-        FlushRegionEntry fqe = new FlushRegionEntry(r);
+        FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
         this.regionsInQueue.put(r, fqe);
         this.flushQueue.add(fqe);
       }
     }
   }
 
-  public void requestDelayedFlush(HRegion r, long delay) {
+  public void requestDelayedFlush(HRegion r, long delay, boolean forceFlushAllStores) {
     synchronized (regionsInQueue) {
       if (!regionsInQueue.containsKey(r)) {
         // This entry has some delay
-        FlushRegionEntry fqe = new FlushRegionEntry(r);
+        FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
         fqe.requeue(delay);
         this.regionsInQueue.put(r, fqe);
         this.flushQueue.add(fqe);
@@ -362,7 +361,7 @@ class MemStoreFlusher implements FlushRequester {
     }
   }
 
-  /*
+  /**
    * A flushRegion that checks store file count.  If too many, puts the flush
    * on delay queue to retry later.
    * @param fqe
@@ -404,22 +403,23 @@ class MemStoreFlusher implements FlushRequester {
         return true;
       }
     }
-    return flushRegion(region, false);
+    return flushRegion(region, false, fqe.isForceFlushAllStores());
   }
 
-  /*
+  /**
    * Flush a region.
    * @param region Region to flush.
    * @param emergencyFlush Set if we are being force flushed. If true the region
    * needs to be removed from the flush queue. If false, when we were called
    * from the main flusher run loop and we got the entry to flush by calling
    * poll on the flush queue (which removed it).
-   *
+   * @param forceFlushAllStores whether we want to flush all store.
    * @return true if the region was successfully flushed, false otherwise. If
    * false, there will be accompanying log messages explaining why the log was
    * not flushed.
    */
-  private boolean flushRegion(final HRegion region, final boolean emergencyFlush) {
+  private boolean flushRegion(final HRegion region, final boolean emergencyFlush,
+      boolean forceFlushAllStores) {
     long startTime = 0;
     synchronized (this.regionsInQueue) {
       FlushRegionEntry fqe = this.regionsInQueue.remove(region);
@@ -442,7 +442,7 @@ class MemStoreFlusher implements FlushRequester {
     lock.readLock().lock();
     try {
       notifyFlushRequest(region, emergencyFlush);
-      HRegion.FlushResult flushResult = region.flushcache();
+      HRegion.FlushResult flushResult = region.flushcache(forceFlushAllStores);
       boolean shouldCompact = flushResult.isCompactionNeeded();
       // We just want to check the size
       boolean shouldSplit = region.checkSplit() != null;
@@ -524,11 +524,12 @@ class MemStoreFlusher implements FlushRequester {
           while (isAboveHighWaterMark() && !server.isStopped()) {
             if (!blocked) {
               startTime = EnvironmentEdgeManager.currentTime();
-              LOG.info("Blocking updates on " + server.toString() +
-                ": the global memstore size " +
-                StringUtils.humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize()) +
-                " is >= than blocking " +
-                StringUtils.humanReadableInt(globalMemStoreLimit) + " size");
+              LOG.info("Blocking updates on "
+                  + server.toString()
+                  + ": the global memstore size "
+                  + TraditionalBinaryPrefix.long2String(server.getRegionServerAccounting()
+                      .getGlobalMemstoreSize(), "", 1) + " is >= than blocking "
+                  + TraditionalBinaryPrefix.long2String(globalMemStoreLimit, "", 1) + " size");
             }
             blocked = true;
             wakeupFlushThread();
@@ -605,7 +606,7 @@ class MemStoreFlusher implements FlushRequester {
    */
   public void setGlobalMemstoreLimit(long globalMemStoreSize) {
     this.globalMemStoreLimit = globalMemStoreSize;
-    this.globalMemStoreLimitLowMark = 
+    this.globalMemStoreLimitLowMark =
         (long) (this.globalMemStoreLimitLowMarkPercent * globalMemStoreSize);
     reclaimMemStoreMemory();
   }
@@ -652,10 +653,13 @@ class MemStoreFlusher implements FlushRequester {
     private long whenToExpire;
     private int requeueCount = 0;
 
-    FlushRegionEntry(final HRegion r) {
+    private boolean forceFlushAllStores;
+
+    FlushRegionEntry(final HRegion r, boolean forceFlushAllStores) {
       this.region = r;
       this.createTime = EnvironmentEdgeManager.currentTime();
       this.whenToExpire = this.createTime;
+      this.forceFlushAllStores = forceFlushAllStores;
     }
 
     /**
@@ -675,6 +679,13 @@ class MemStoreFlusher implements FlushRequester {
     }
 
     /**
+     * @return whether we need to flush all stores.
+     */
+    public boolean isForceFlushAllStores() {
+      return forceFlushAllStores;
+    }
+
+    /**
      * @param when When to expire, when to come up out of the queue.
      * Specify in milliseconds.  This method adds EnvironmentEdgeManager.currentTime()
      * to whatever you pass.

http://git-wip-us.apache.org/repos/asf/hbase/blob/e55ef7a6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 06e51c6..fec3030 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -72,7 +72,6 @@ import org.apache.hadoop.hbase.exceptions.OperationConflictException;
 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.PriorityFunction;
@@ -150,8 +149,6 @@ import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
 import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
 import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.wal.WALSplitter;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -159,6 +156,8 @@ import org.apache.hadoop.hbase.util.Counter;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Strings;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALSplitter;
 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
 import org.apache.hadoop.net.DNS;
 import org.apache.zookeeper.KeeperException;
@@ -688,7 +687,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
    */
   private OperationStatus [] doReplayBatchOp(final HRegion region,
       final List<WALSplitter.MutationReplay> mutations, long replaySeqId) throws IOException {
-
     long before = EnvironmentEdgeManager.currentTime();
     boolean batchContainsPuts = false, batchContainsDelete = false;
     try {
@@ -1069,7 +1067,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       LOG.info("Flushing " + region.getRegionNameAsString());
       boolean shouldFlush = true;
       if (request.hasIfOlderThanTs()) {
-        shouldFlush = region.getLastFlushTime() < request.getIfOlderThanTs();
+        shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs();
       }
       FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
       if (shouldFlush) {
@@ -1086,7 +1084,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         }
         builder.setFlushed(result);
       }
-      builder.setLastFlushTime(region.getLastFlushTime());
+      builder.setLastFlushTime( region.getEarliestFlushTimeForAllStores());
       return builder.build();
     } catch (DroppedSnapshotException ex) {
       // Cache flush can fail in a few places. If it fails in a critical
@@ -2123,7 +2121,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           } else {
             addResults(builder, results, controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()));
           }
-        } finally { 
+        } finally {
           // We're done. On way out re-add the above removed lease.
           // Adding resets expiration time on lease.
           if (scanners.containsKey(scannerName)) {


Mime
View raw message