hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject [3/3] hbase git commit: HBASE-14920: Compacting memstore
Date Fri, 20 May 2016 10:42:05 GMT
HBASE-14920: Compacting memstore

Signed-off-by: stack <stack@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a27504c7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a27504c7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a27504c7

Branch: refs/heads/master
Commit: a27504c70181ec3585033eaee2523184c40a144f
Parents: af5146e
Author: eshcar <eshcar@yahoo-inc.com>
Authored: Mon May 16 15:50:20 2016 +0300
Committer: stack <stack@apache.org>
Committed: Fri May 20 03:41:43 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hbase/HColumnDescriptor.java  |  83 ++-
 .../test/IntegrationTestBigLinkedList.java      |   4 +-
 .../hbase/regionserver/AbstractMemStore.java    |  33 +-
 .../hbase/regionserver/CompactingMemStore.java  | 406 +++++++++++
 .../hbase/regionserver/CompactionPipeline.java  | 190 +++++
 .../hbase/regionserver/DefaultMemStore.java     |  31 +-
 .../regionserver/FlushAllLargeStoresPolicy.java |  75 ++
 .../regionserver/FlushLargeStoresPolicy.java    |  54 +-
 .../FlushNonSloppyStoresFirstPolicy.java        |  66 ++
 .../hbase/regionserver/FlushPolicyFactory.java  |   2 +-
 .../hadoop/hbase/regionserver/HMobStore.java    |   4 +
 .../hadoop/hbase/regionserver/HRegion.java      |  70 +-
 .../hadoop/hbase/regionserver/HStore.java       |  17 +-
 .../hbase/regionserver/ImmutableSegment.java    |   4 -
 .../hadoop/hbase/regionserver/MemStore.java     |  14 +-
 .../hbase/regionserver/MemStoreCompactor.java   | 197 +++++
 .../regionserver/RegionServicesForStores.java   |  49 +-
 .../hadoop/hbase/regionserver/Segment.java      |  34 +-
 .../hbase/regionserver/SegmentFactory.java      |   6 +-
 .../hbase/regionserver/SegmentScanner.java      |   5 +-
 .../apache/hadoop/hbase/regionserver/Store.java |   1 +
 .../regionserver/VersionedSegmentsList.java     |  54 ++
 .../hbase/regionserver/wal/AbstractFSWAL.java   |  19 +-
 .../hadoop/hbase/regionserver/wal/FSHLog.java   |  18 +-
 .../regionserver/wal/SequenceIdAccounting.java  |  35 +-
 .../hadoop/hbase/wal/DisabledWALProvider.java   |   8 +-
 .../java/org/apache/hadoop/hbase/wal/WAL.java   |  15 +-
 .../hadoop/hbase/HBaseTestingUtility.java       |  51 +-
 .../org/apache/hadoop/hbase/TestIOFencing.java  |   5 +
 .../apache/hadoop/hbase/io/TestHeapSize.java    |  12 +-
 .../regionserver/TestCompactingMemStore.java    | 729 +++++++++++++++++++
 .../hbase/regionserver/TestDefaultMemStore.java | 159 ++--
 .../hadoop/hbase/regionserver/TestHRegion.java  |  63 +-
 .../TestHRegionWithInMemoryFlush.java           |  61 ++
 .../regionserver/TestPerColumnFamilyFlush.java  |  29 +-
 .../TestWalAndCompactingMemStoreFlush.java      | 565 ++++++++++++++
 hbase-shell/src/main/ruby/hbase.rb              |   1 +
 hbase-shell/src/main/ruby/hbase/admin.rb        |   1 +
 38 files changed, 2863 insertions(+), 307 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
index 3c16f4e..3153430 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hadoop.hbase;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
@@ -38,7 +40,6 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.PrettyPrinter;
 import org.apache.hadoop.hbase.util.PrettyPrinter.Unit;
 
-import com.google.common.base.Preconditions;
 
 /**
  * An HColumnDescriptor contains information about a column family such as the
@@ -62,6 +63,8 @@ public class HColumnDescriptor implements Comparable<HColumnDescriptor> {
   // Version 11 -- add column family level configuration.
   private static final byte COLUMN_DESCRIPTOR_VERSION = (byte) 11;
 
+  private static final String IN_MEMORY_COMPACTION = "IN_MEMORY_COMPACTION";
+
   // These constants are used as FileInfo keys
   public static final String COMPRESSION = "COMPRESSION";
   public static final String COMPRESSION_COMPACT = "COMPRESSION_COMPACT";
@@ -151,7 +154,7 @@ public class HColumnDescriptor implements Comparable<HColumnDescriptor> {
    * Default number of versions of a record to keep.
    */
   public static final int DEFAULT_VERSIONS = HBaseConfiguration.create().getInt(
-    "hbase.column.max.version", 1);
+      "hbase.column.max.version", 1);
 
   /**
    * Default is not to keep a minimum of versions.
@@ -170,6 +173,11 @@ public class HColumnDescriptor implements Comparable<HColumnDescriptor> {
   public static final boolean DEFAULT_IN_MEMORY = false;
 
   /**
+   * Default setting for whether to set the memstore of this column family as compacting or not.
+   */
+  public static final boolean DEFAULT_IN_MEMORY_COMPACTION = false;
+
+  /**
    * Default setting for preventing deleted from being collected immediately.
    */
   public static final KeepDeletedCells DEFAULT_KEEP_DELETED = KeepDeletedCells.FALSE;
@@ -246,30 +254,31 @@ public class HColumnDescriptor implements Comparable<HColumnDescriptor> {
       = new HashSet<Bytes>();
 
   static {
-      DEFAULT_VALUES.put(BLOOMFILTER, DEFAULT_BLOOMFILTER);
-      DEFAULT_VALUES.put(REPLICATION_SCOPE, String.valueOf(DEFAULT_REPLICATION_SCOPE));
-      DEFAULT_VALUES.put(HConstants.VERSIONS, String.valueOf(DEFAULT_VERSIONS));
-      DEFAULT_VALUES.put(MIN_VERSIONS, String.valueOf(DEFAULT_MIN_VERSIONS));
-      DEFAULT_VALUES.put(COMPRESSION, DEFAULT_COMPRESSION);
-      DEFAULT_VALUES.put(TTL, String.valueOf(DEFAULT_TTL));
-      DEFAULT_VALUES.put(BLOCKSIZE, String.valueOf(DEFAULT_BLOCKSIZE));
-      DEFAULT_VALUES.put(HConstants.IN_MEMORY, String.valueOf(DEFAULT_IN_MEMORY));
-      DEFAULT_VALUES.put(BLOCKCACHE, String.valueOf(DEFAULT_BLOCKCACHE));
-      DEFAULT_VALUES.put(KEEP_DELETED_CELLS, String.valueOf(DEFAULT_KEEP_DELETED));
-      DEFAULT_VALUES.put(DATA_BLOCK_ENCODING, String.valueOf(DEFAULT_DATA_BLOCK_ENCODING));
-      DEFAULT_VALUES.put(CACHE_DATA_ON_WRITE, String.valueOf(DEFAULT_CACHE_DATA_ON_WRITE));
-      DEFAULT_VALUES.put(CACHE_DATA_IN_L1, String.valueOf(DEFAULT_CACHE_DATA_IN_L1));
-      DEFAULT_VALUES.put(CACHE_INDEX_ON_WRITE, String.valueOf(DEFAULT_CACHE_INDEX_ON_WRITE));
-      DEFAULT_VALUES.put(CACHE_BLOOMS_ON_WRITE, String.valueOf(DEFAULT_CACHE_BLOOMS_ON_WRITE));
-      DEFAULT_VALUES.put(EVICT_BLOCKS_ON_CLOSE, String.valueOf(DEFAULT_EVICT_BLOCKS_ON_CLOSE));
-      DEFAULT_VALUES.put(PREFETCH_BLOCKS_ON_OPEN, String.valueOf(DEFAULT_PREFETCH_BLOCKS_ON_OPEN));
-      for (String s : DEFAULT_VALUES.keySet()) {
-        RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(s)));
-      }
-      RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(ENCRYPTION)));
-      RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(ENCRYPTION_KEY)));
-      RESERVED_KEYWORDS.add(new Bytes(IS_MOB_BYTES));
-      RESERVED_KEYWORDS.add(new Bytes(MOB_THRESHOLD_BYTES));
+    DEFAULT_VALUES.put(BLOOMFILTER, DEFAULT_BLOOMFILTER);
+    DEFAULT_VALUES.put(REPLICATION_SCOPE, String.valueOf(DEFAULT_REPLICATION_SCOPE));
+    DEFAULT_VALUES.put(HConstants.VERSIONS, String.valueOf(DEFAULT_VERSIONS));
+    DEFAULT_VALUES.put(MIN_VERSIONS, String.valueOf(DEFAULT_MIN_VERSIONS));
+    DEFAULT_VALUES.put(COMPRESSION, DEFAULT_COMPRESSION);
+    DEFAULT_VALUES.put(TTL, String.valueOf(DEFAULT_TTL));
+    DEFAULT_VALUES.put(BLOCKSIZE, String.valueOf(DEFAULT_BLOCKSIZE));
+    DEFAULT_VALUES.put(HConstants.IN_MEMORY, String.valueOf(DEFAULT_IN_MEMORY));
+    DEFAULT_VALUES.put(IN_MEMORY_COMPACTION, String.valueOf(DEFAULT_IN_MEMORY_COMPACTION));
+    DEFAULT_VALUES.put(BLOCKCACHE, String.valueOf(DEFAULT_BLOCKCACHE));
+    DEFAULT_VALUES.put(KEEP_DELETED_CELLS, String.valueOf(DEFAULT_KEEP_DELETED));
+    DEFAULT_VALUES.put(DATA_BLOCK_ENCODING, String.valueOf(DEFAULT_DATA_BLOCK_ENCODING));
+    DEFAULT_VALUES.put(CACHE_DATA_ON_WRITE, String.valueOf(DEFAULT_CACHE_DATA_ON_WRITE));
+    DEFAULT_VALUES.put(CACHE_DATA_IN_L1, String.valueOf(DEFAULT_CACHE_DATA_IN_L1));
+    DEFAULT_VALUES.put(CACHE_INDEX_ON_WRITE, String.valueOf(DEFAULT_CACHE_INDEX_ON_WRITE));
+    DEFAULT_VALUES.put(CACHE_BLOOMS_ON_WRITE, String.valueOf(DEFAULT_CACHE_BLOOMS_ON_WRITE));
+    DEFAULT_VALUES.put(EVICT_BLOCKS_ON_CLOSE, String.valueOf(DEFAULT_EVICT_BLOCKS_ON_CLOSE));
+    DEFAULT_VALUES.put(PREFETCH_BLOCKS_ON_OPEN, String.valueOf(DEFAULT_PREFETCH_BLOCKS_ON_OPEN));
+    for (String s : DEFAULT_VALUES.keySet()) {
+      RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(s)));
+    }
+    RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(ENCRYPTION)));
+    RESERVED_KEYWORDS.add(new Bytes(Bytes.toBytes(ENCRYPTION_KEY)));
+    RESERVED_KEYWORDS.add(new Bytes(IS_MOB_BYTES));
+    RESERVED_KEYWORDS.add(new Bytes(MOB_THRESHOLD_BYTES));
   }
 
   private static final int UNINITIALIZED = -1;
@@ -319,6 +328,7 @@ public class HColumnDescriptor implements Comparable<HColumnDescriptor> {
     setMinVersions(DEFAULT_MIN_VERSIONS);
     setKeepDeletedCells(DEFAULT_KEEP_DELETED);
     setInMemory(DEFAULT_IN_MEMORY);
+    setInMemoryCompaction(DEFAULT_IN_MEMORY_COMPACTION);
     setBlockCacheEnabled(DEFAULT_BLOCKCACHE);
     setTimeToLive(DEFAULT_TTL);
     setCompressionType(Compression.Algorithm.valueOf(DEFAULT_COMPRESSION.toUpperCase()));
@@ -676,6 +686,27 @@ public class HColumnDescriptor implements Comparable<HColumnDescriptor> {
     return setValue(HConstants.IN_MEMORY, Boolean.toString(inMemory));
   }
 
+  /**
+   * @return True if we prefer to keep the in-memory data compacted
+   *          for this column family
+   */
+  public boolean isInMemoryCompaction() {
+    String value = getValue(IN_MEMORY_COMPACTION);
+    if (value != null) {
+      return Boolean.parseBoolean(value);
+    }
+    return DEFAULT_IN_MEMORY_COMPACTION;
+  }
+
+  /**
+   * @param inMemoryCompaction True if we prefer to keep the in-memory data compacted
+   *                  for this column family
+   * @return this (for chained invocation)
+   */
+  public HColumnDescriptor setInMemoryCompaction(boolean inMemoryCompaction) {
+    return setValue(IN_MEMORY_COMPACTION, Boolean.toString(inMemoryCompaction));
+  }
+
   public KeepDeletedCells getKeepDeletedCells() {
     String value = getValue(KEEP_DELETED_CELLS);
     if (value != null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
index c864580..430c8a6 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
@@ -80,7 +80,7 @@ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TableMapper;
 import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
 import org.apache.hadoop.hbase.mapreduce.WALPlayer;
-import org.apache.hadoop.hbase.regionserver.FlushLargeStoresPolicy;
+import org.apache.hadoop.hbase.regionserver.FlushAllLargeStoresPolicy;
 import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.testclassification.IntegrationTests;
@@ -1586,7 +1586,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase {
     Configuration conf = getTestingUtil(getConf()).getConfiguration();
     if (isMultiUnevenColumnFamilies(getConf())) {
       // make sure per CF flush is on
-      conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName());
+      conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllLargeStoresPolicy.class.getName());
     }
     int ret =
         ToolRunner.run(conf, new Loop(), new String[] { "1", "1", "2000000",

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
index c3724fc..0f27e0e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
@@ -64,7 +64,7 @@ public abstract class AbstractMemStore implements MemStore {
           (2 * Bytes.SIZEOF_LONG));
 
   public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
-      2 * (ClassSize.ATOMIC_LONG + ClassSize.TIMERANGE_TRACKER +
+      (ClassSize.ATOMIC_LONG + ClassSize.TIMERANGE_TRACKER +
       ClassSize.CELL_SKIPLIST_SET + ClassSize.CONCURRENT_SKIPLISTMAP));
 
 
@@ -99,7 +99,7 @@ public abstract class AbstractMemStore implements MemStore {
    * @param onlyIfMoreRecent a flag that marks whether to update the sequence id no matter what or
    *                      only if it is greater than the previous sequence id
    */
-  public abstract void updateLowestUnflushedSequenceIdInWal(boolean onlyIfMoreRecent);
+  public abstract void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent);
 
   /**
    * Write an update
@@ -162,17 +162,9 @@ public abstract class AbstractMemStore implements MemStore {
   }
 
   /**
-   * An override on snapshot so the no arg version of the method implies zero seq num,
-   * like for cases without wal
-   */
-  public MemStoreSnapshot snapshot() {
-    return snapshot(0);
-  }
-
-  /**
    * The passed snapshot was successfully persisted; it can be let go.
    * @param id Id of the snapshot to clean out.
-   * @see MemStore#snapshot(long)
+   * @see MemStore#snapshot()
    */
   @Override
   public void clearSnapshot(long id) throws UnexpectedStateException {
@@ -201,18 +193,6 @@ public abstract class AbstractMemStore implements MemStore {
   }
 
   /**
-   * On flush, how much memory we will clear from the active cell set.
-   *
-   * @return size of data that is going to be flushed from active set
-   */
-  @Override
-  public long getFlushableSize() {
-    long snapshotSize = getSnapshot().getSize();
-    return snapshotSize > 0 ? snapshotSize : keySize();
-  }
-
-
-  /**
    * @return a list containing a single memstore scanner.
    */
   @Override
@@ -230,7 +210,7 @@ public abstract class AbstractMemStore implements MemStore {
     StringBuffer buf = new StringBuffer();
     int i = 1;
     try {
-      for (Segment segment : getListOfSegments()) {
+      for (Segment segment : getSegments()) {
         buf.append("Segment (" + i + ") " + segment.toString() + "; ");
         i++;
       }
@@ -471,9 +451,6 @@ public abstract class AbstractMemStore implements MemStore {
    * Returns an ordered list of segments from most recent to oldest in memstore
    * @return an ordered list of segments from most recent to oldest in memstore
    */
-  protected abstract List<Segment> getListOfSegments() throws IOException;
+  protected abstract List<Segment> getSegments() throws IOException;
 
-  public long getActiveSize() {
-    return getActive().getSize();
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
new file mode 100644
index 0000000..7aaece6
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -0,0 +1,406 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.WAL;
+
+/**
+ * A memstore implementation which supports in-memory compaction.
+ * A compaction pipeline is added between the active set and the snapshot data structures;
+ * it consists of a list of kv-sets that are subject to compaction.
+ * Like the snapshot, all pipeline components are read-only; updates only affect the active set.
+ * To ensure this property we take advantage of the existing blocking mechanism -- the active set
+ * is pushed to the pipeline while holding the region's updatesLock in exclusive mode.
+ * Periodically, a compaction is applied in the background to all pipeline components resulting
+ * in a single read-only component. The ``old'' components are discarded when no scanner is reading
+ * them.
+ */
+@InterfaceAudience.Private
+public class CompactingMemStore extends AbstractMemStore {
+  public final static long DEEP_OVERHEAD_PER_PIPELINE_ITEM = ClassSize.align(
+      ClassSize.TIMERANGE_TRACKER + ClassSize.TIMERANGE +
+          ClassSize.CELL_SKIPLIST_SET + ClassSize.CONCURRENT_SKIPLISTMAP);
+  // Default fraction of in-memory-flush size w.r.t. flush-to-disk size
+  public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY =
+      "hbase.memestore.inmemoryflush.threshold.factor";
+  private static final double IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT = 0.25;
+
+  private static final Log LOG = LogFactory.getLog(CompactingMemStore.class);
+  private Store store;
+  private RegionServicesForStores regionServices;
+  private CompactionPipeline pipeline;
+  private MemStoreCompactor compactor;
+  // the threshold on active size for in-memory flush
+  private long inmemoryFlushSize;
+  private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false);
+  @VisibleForTesting
+  private final AtomicBoolean allowCompaction = new AtomicBoolean(true);
+
+  public CompactingMemStore(Configuration conf, CellComparator c,
+      HStore store, RegionServicesForStores regionServices) throws IOException {
+    super(conf, c);
+    this.store = store;
+    this.regionServices = regionServices;
+    this.pipeline = new CompactionPipeline(getRegionServices());
+    this.compactor = new MemStoreCompactor(this);
+    initInmemoryFlushSize(conf);
+  }
+
+  private void initInmemoryFlushSize(Configuration conf) {
+    long memstoreFlushSize = getRegionServices().getMemstoreFlushSize();
+    int numStores = getRegionServices().getNumStores();
+    if (numStores <= 1) {
+      // Family number might also be zero in some of our unit test case
+      numStores = 1;
+    }
+    inmemoryFlushSize = memstoreFlushSize / numStores;
+    // multiply by a factor
+    double factor =  conf.getDouble(IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY,
+        IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT);
+    inmemoryFlushSize *= factor;
+    LOG.debug("Setting in-memory flush size threshold to " + inmemoryFlushSize);
+  }
+
+  public static long getSegmentSize(Segment segment) {
+    return segment.getSize() - DEEP_OVERHEAD_PER_PIPELINE_ITEM;
+  }
+
+  public static long getSegmentsSize(List<? extends Segment> list) {
+    long res = 0;
+    for (Segment segment : list) {
+      res += getSegmentSize(segment);
+    }
+    return res;
+  }
+
+  /**
+   * @return Total memory occupied by this MemStore.
+   * This is not thread safe and the memstore may be changed while computing its size.
+   * It is the responsibility of the caller to make sure this doesn't happen.
+   */
+  @Override
+  public long size() {
+    long res = 0;
+    for (Segment item : getSegments()) {
+      res += item.getSize();
+    }
+    return res;
+  }
+
+  /**
+   * This method is called when it is clear that the flush to disk is completed.
+   * The store may do any post-flush actions at this point.
+   * One example is to update the WAL with sequence number that is known only at the store level.
+   */
+  @Override public void finalizeFlush() {
+    updateLowestUnflushedSequenceIdInWAL(false);
+  }
+
+  @Override public boolean isSloppy() {
+    return true;
+  }
+
+  /**
+   * Push the current active memstore segment into the pipeline
+   * and create a snapshot of the tail of current compaction pipeline
+   * Snapshot must be cleared by call to {@link #clearSnapshot}.
+   * {@link #clearSnapshot(long)}.
+   * @return {@link MemStoreSnapshot}
+   */
+  @Override
+  public MemStoreSnapshot snapshot() {
+    MutableSegment active = getActive();
+    // If snapshot currently has entries, then flusher failed or didn't call
+    // cleanup.  Log a warning.
+    if (!getSnapshot().isEmpty()) {
+      LOG.warn("Snapshot called again without clearing previous. " +
+          "Doing nothing. Another ongoing flush or did we fail last attempt?");
+    } else {
+      LOG.info("FLUSHING TO DISK: region "+ getRegionServices().getRegionInfo()
+          .getRegionNameAsString() + "store: "+ getFamilyName());
+      stopCompaction();
+      pushActiveToPipeline(active);
+      snapshotId = EnvironmentEdgeManager.currentTime();
+      pushTailToSnapshot();
+    }
+    return new MemStoreSnapshot(snapshotId, getSnapshot());
+  }
+
+  /**
+   * On flush, how much memory we will clear.
+   * @return size of data that is going to be flushed
+   */
+  @Override public long getFlushableSize() {
+    long snapshotSize = getSnapshot().getSize();
+    if(snapshotSize == 0) {
+      //if snapshot is empty the tail of the pipeline is flushed
+      snapshotSize = pipeline.getTailSize();
+    }
+    return snapshotSize > 0 ? snapshotSize : keySize();
+  }
+
+  @Override
+  public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfGreater) {
+    long minSequenceId = pipeline.getMinSequenceId();
+    if(minSequenceId != Long.MAX_VALUE) {
+      byte[] encodedRegionName = getRegionServices().getRegionInfo().getEncodedNameAsBytes();
+      byte[] familyName = getFamilyNameInByte();
+      WAL WAL = getRegionServices().getWAL();
+      if (WAL != null) {
+        WAL.updateStore(encodedRegionName, familyName, minSequenceId, onlyIfGreater);
+      }
+    }
+  }
+
+  @Override
+  public List<Segment> getSegments() {
+    List<Segment> pipelineList = pipeline.getSegments();
+    List<Segment> list = new LinkedList<Segment>();
+    list.add(getActive());
+    list.addAll(pipelineList);
+    list.add(getSnapshot());
+    return list;
+  }
+
+  public void setInMemoryFlushInProgress(boolean inMemoryFlushInProgress) {
+    this.inMemoryFlushInProgress.set(inMemoryFlushInProgress);
+  }
+
+  public void swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result) {
+    pipeline.swap(versionedList, result);
+  }
+
+  public boolean hasCompactibleSegments() {
+    return !pipeline.isEmpty();
+  }
+
+  public VersionedSegmentsList getCompactibleSegments() {
+    return pipeline.getVersionedList();
+  }
+
+  public long getSmallestReadPoint() {
+    return store.getSmallestReadPoint();
+  }
+
+  public Store getStore() {
+    return store;
+  }
+
+  public String getFamilyName() {
+    return Bytes.toString(getFamilyNameInByte());
+  }
+
+  @Override
+  /*
+   * Scanners are ordered from 0 (oldest) to newest in increasing order.
+   */
+  protected List<SegmentScanner> getListOfScanners(long readPt) throws IOException {
+    List<Segment> pipelineList = pipeline.getSegments();
+    long order = pipelineList.size();
+    LinkedList<SegmentScanner> list = new LinkedList<SegmentScanner>();
+    list.add(getActive().getSegmentScanner(readPt, order+1));
+    for (Segment item : pipelineList) {
+      list.add(item.getSegmentScanner(readPt, order));
+      order--;
+    }
+    list.add(getSnapshot().getSegmentScanner(readPt, order));
+    return list;
+  }
+
+  /**
+   * Check whether anything need to be done based on the current active set size.
+   * The method is invoked upon every addition to the active set.
+   * For CompactingMemStore, flush the active set to the read-only memory if it's
+   * size is above threshold
+   */
+  @Override
+  protected void checkActiveSize() {
+    if (shouldFlushInMemory()) {
+      /* The thread is dispatched to flush-in-memory. This cannot be done
+      * on the same thread, because for flush-in-memory we require updatesLock
+      * in exclusive mode while this method (checkActiveSize) is invoked holding updatesLock
+      * in the shared mode. */
+      InMemoryFlushRunnable runnable = new InMemoryFlushRunnable();
+      LOG.info("Dispatching the MemStore in-memory flush for store " + store.getColumnFamilyName());
+      getPool().execute(runnable);
+      // guard against queuing same old compactions over and over again
+      inMemoryFlushInProgress.set(true);
+    }
+  }
+
+  // internally used method, externally visible only for tests
+  // when invoked directly from tests it must be verified that the caller doesn't hold updatesLock,
+  // otherwise there is a deadlock
+  @VisibleForTesting
+  void flushInMemory() throws IOException {
+    // Phase I: Update the pipeline
+    getRegionServices().blockUpdates();
+    try {
+      MutableSegment active = getActive();
+      LOG.info("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline, " +
+          "and initiating compaction.");
+      pushActiveToPipeline(active);
+    } finally {
+      getRegionServices().unblockUpdates();
+    }
+    // Phase II: Compact the pipeline
+    try {
+      if (allowCompaction.get()) {
+        // setting the inMemoryFlushInProgress flag again for the case this method is invoked
+        // directly (only in tests) in the common path setting from true to true is idempotent
+        inMemoryFlushInProgress.set(true);
+        // Speculative compaction execution, may be interrupted if flush is forced while
+        // compaction is in progress
+        compactor.startCompaction();
+      }
+    } catch (IOException e) {
+      LOG.warn("Unable to run memstore compaction. region "
+          + getRegionServices().getRegionInfo().getRegionNameAsString()
+          + "store: "+ getFamilyName(), e);
+    }
+  }
+
+  private byte[] getFamilyNameInByte() {
+    return store.getFamily().getName();
+  }
+
+  private ThreadPoolExecutor getPool() {
+    return getRegionServices().getInMemoryCompactionPool();
+  }
+
+  private boolean shouldFlushInMemory() {
+    if(getActive().getSize() > inmemoryFlushSize) {
+      // size above flush threshold
+      return (allowCompaction.get() && !inMemoryFlushInProgress.get());
+    }
+    return false;
+  }
+
+  /**
+   * The request to cancel the compaction asynchronous task (caused by in-memory flush)
+   * The compaction may still happen if the request was sent too late
+   * Non-blocking request
+   */
+  private void stopCompaction() {
+    if (inMemoryFlushInProgress.get()) {
+      compactor.stopCompact();
+      inMemoryFlushInProgress.set(false);
+    }
+  }
+
+  private void pushActiveToPipeline(MutableSegment active) {
+    if (!active.isEmpty()) {
+      long delta = DEEP_OVERHEAD_PER_PIPELINE_ITEM - DEEP_OVERHEAD;
+      active.setSize(active.getSize() + delta);
+      pipeline.pushHead(active);
+      resetCellSet();
+    }
+  }
+
+  private void pushTailToSnapshot() {
+    ImmutableSegment tail = pipeline.pullTail();
+    if (!tail.isEmpty()) {
+      setSnapshot(tail);
+      long size = getSegmentSize(tail);
+      setSnapshotSize(size);
+    }
+  }
+
+  private RegionServicesForStores getRegionServices() {
+    return regionServices;
+  }
+
+  /**
+  * The in-memory-flusher thread performs the flush asynchronously.
+  * There is at most one thread per memstore instance.
+  * It takes the updatesLock exclusively, pushes active into the pipeline, releases updatesLock
+  * and compacts the pipeline.
+  */
+  private class InMemoryFlushRunnable implements Runnable {
+
+    @Override public void run() {
+      try {
+        flushInMemory();
+      } catch (IOException e) {
+        LOG.warn("Unable to run memstore compaction. region "
+            + getRegionServices().getRegionInfo().getRegionNameAsString()
+            + "store: "+ getFamilyName(), e);
+      }
+    }
+  }
+
+  //----------------------------------------------------------------------
+  //methods for tests
+  //----------------------------------------------------------------------
+  boolean isMemStoreFlushingInMemory() {
+    return inMemoryFlushInProgress.get();
+  }
+
+  void disableCompaction() {
+    allowCompaction.set(false);
+  }
+
+  void enableCompaction() {
+    allowCompaction.set(true);
+  }
+
+  /**
+   * @param cell Find the row that comes after this one.  If null, we return the
+   *             first.
+   * @return Next row or null if none found.
+   */
+  Cell getNextRow(final Cell cell) {
+    Cell lowest = null;
+    List<Segment> segments = getSegments();
+    for (Segment segment : segments) {
+      if (lowest == null) {
+        lowest = getNextRow(cell, segment.getCellSet());
+      } else {
+        lowest = getLowest(lowest, getNextRow(cell, segment.getCellSet()));
+      }
+    }
+    return lowest;
+  }
+
+  // debug method
+  private void debug() {
+    String msg = "active size="+getActive().getSize();
+    msg += " threshold="+IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT* inmemoryFlushSize;
+    msg += " allow compaction is "+ (allowCompaction.get() ? "true" : "false");
+    msg += " inMemoryFlushInProgress is "+ (inMemoryFlushInProgress.get() ? "true" : "false");
+    LOG.debug(msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
new file mode 100644
index 0000000..e33ceae
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -0,0 +1,190 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * The compaction pipeline of a {@link CompactingMemStore}, is a FIFO queue of segments.
+ * It supports pushing a segment at the head of the pipeline and pulling a segment from the
+ * tail to flush to disk.
+ * It also supports swap operation to allow the compactor swap a subset of the segments with a new
+ * (compacted) one. This swap succeeds only if the version number passed with the list of segments
+ * to swap is the same as the current version of the pipeline.
+ * The pipeline version is updated whenever swapping segments or pulling the segment at the tail.
+ */
+@InterfaceAudience.Private
+public class CompactionPipeline {
+  private static final Log LOG = LogFactory.getLog(CompactionPipeline.class);
+
+  private final RegionServicesForStores region;
+  private LinkedList<ImmutableSegment> pipeline;
+  private long version;
+
+  private static final ImmutableSegment EMPTY_MEM_STORE_SEGMENT = SegmentFactory.instance()
+      .createImmutableSegment(null,
+          CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_ITEM);
+
+  public CompactionPipeline(RegionServicesForStores region) {
+    this.region = region;
+    this.pipeline = new LinkedList<ImmutableSegment>();
+    this.version = 0;
+  }
+
+  public boolean pushHead(MutableSegment segment) {
+    ImmutableSegment immutableSegment = SegmentFactory.instance().
+        createImmutableSegment(segment);
+    synchronized (pipeline){
+      return addFirst(immutableSegment);
+    }
+  }
+
+  public ImmutableSegment pullTail() {
+    synchronized (pipeline){
+      if(pipeline.isEmpty()) {
+        return EMPTY_MEM_STORE_SEGMENT;
+      }
+      return removeLast();
+    }
+  }
+
+  public VersionedSegmentsList getVersionedList() {
+    synchronized (pipeline){
+      LinkedList<ImmutableSegment> segmentList = new LinkedList<ImmutableSegment>(pipeline);
+      VersionedSegmentsList res = new VersionedSegmentsList(segmentList, version);
+      return res;
+    }
+  }
+
+  /**
+   * Swaps the versioned list at the tail of the pipeline with the new compacted segment.
+   * Swapping only if there were no changes to the suffix of the list while it was compacted.
+   * @param versionedList tail of the pipeline that was compacted
+   * @param segment new compacted segment
+   * @return true iff swapped tail with new compacted segment
+   */
+  public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment) {
+    if(versionedList.getVersion() != version) {
+      return false;
+    }
+    LinkedList<ImmutableSegment> suffix;
+    synchronized (pipeline){
+      if(versionedList.getVersion() != version) {
+        return false;
+      }
+      suffix = versionedList.getStoreSegments();
+      LOG.info("Swapping pipeline suffix with compacted item. "
+          +"Just before the swap the number of segments in pipeline is:"
+          +versionedList.getStoreSegments().size()
+          +", and the number of cells in new segment is:"+segment.getCellsCount());
+      swapSuffix(suffix,segment);
+    }
+    if(region != null) {
+      // update the global memstore size counter
+      long suffixSize = CompactingMemStore.getSegmentsSize(suffix);
+      long newSize = CompactingMemStore.getSegmentSize(segment);
+      long delta = suffixSize - newSize;
+      long globalMemstoreSize = region.addAndGetGlobalMemstoreSize(-delta);
+      LOG.info("Suffix size: "+ suffixSize+" compacted item size: "+newSize+
+          " globalMemstoreSize: "+globalMemstoreSize);
+    }
+    return true;
+  }
+
+  public boolean isEmpty() {
+    return pipeline.isEmpty();
+  }
+
+  public List<Segment> getSegments() {
+    synchronized (pipeline){
+      List<Segment> res = new LinkedList<Segment>(pipeline);
+      return res;
+    }
+  }
+
+  public long size() {
+    return pipeline.size();
+  }
+
+  public long getMinSequenceId() {
+    long minSequenceId = Long.MAX_VALUE;
+    if(!isEmpty()) {
+      minSequenceId = pipeline.getLast().getMinSequenceId();
+    }
+    return minSequenceId;
+  }
+
+  public long getTailSize() {
+    if(isEmpty()) return 0;
+    return CompactingMemStore.getSegmentSize(pipeline.peekLast());
+  }
+
+  private void swapSuffix(LinkedList<ImmutableSegment> suffix, ImmutableSegment segment) {
+    version++;
+    for(Segment itemInSuffix : suffix) {
+      itemInSuffix.close();
+    }
+    pipeline.removeAll(suffix);
+    pipeline.addLast(segment);
+  }
+
+  private ImmutableSegment removeLast() {
+    version++;
+    return pipeline.removeLast();
+  }
+
+  private boolean addFirst(ImmutableSegment segment) {
+    pipeline.add(0,segment);
+    return true;
+  }
+
+  // debug method
+  private boolean validateSuffixList(LinkedList<ImmutableSegment> suffix) {
+    if(suffix.isEmpty()) {
+      // empty suffix is always valid
+      return true;
+    }
+
+    Iterator<ImmutableSegment> pipelineBackwardIterator = pipeline.descendingIterator();
+    Iterator<ImmutableSegment> suffixBackwardIterator = suffix.descendingIterator();
+    ImmutableSegment suffixCurrent;
+    ImmutableSegment pipelineCurrent;
+    for( ; suffixBackwardIterator.hasNext(); ) {
+      if(!pipelineBackwardIterator.hasNext()) {
+        // a suffix longer than pipeline is invalid
+        return false;
+      }
+      suffixCurrent = suffixBackwardIterator.next();
+      pipelineCurrent = pipelineBackwardIterator.next();
+      if(suffixCurrent != pipelineCurrent) {
+        // non-matching suffix
+        return false;
+      }
+    }
+    // suffix matches pipeline suffix
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index 3d65bca..cdc910e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -77,10 +77,9 @@ public class DefaultMemStore extends AbstractMemStore {
   /**
    * Creates a snapshot of the current memstore.
    * Snapshot must be cleared by call to {@link #clearSnapshot(long)}
-   * @param flushOpSeqId the sequence id that is attached to the flush operation in the wal
    */
   @Override
-  public MemStoreSnapshot snapshot(long flushOpSeqId) {
+  public MemStoreSnapshot snapshot() {
     // If snapshot currently has entries, then flusher failed or didn't call
     // cleanup.  Log a warning.
     if (!getSnapshot().isEmpty()) {
@@ -90,7 +89,7 @@ public class DefaultMemStore extends AbstractMemStore {
       this.snapshotId = EnvironmentEdgeManager.currentTime();
       if (!getActive().isEmpty()) {
         ImmutableSegment immutableSegment = SegmentFactory.instance().
-            createImmutableSegment(getConfiguration(), getActive());
+            createImmutableSegment(getActive());
         setSnapshot(immutableSegment);
         setSnapshotSize(keySize());
         resetCellSet();
@@ -99,16 +98,30 @@ public class DefaultMemStore extends AbstractMemStore {
     return new MemStoreSnapshot(this.snapshotId, getSnapshot());
   }
 
+  /**
+   * On flush, how much memory we will clear from the active cell set.
+   *
+   * @return size of data that is going to be flushed from active set
+   */
+  @Override
+  public long getFlushableSize() {
+    long snapshotSize = getSnapshot().getSize();
+    return snapshotSize > 0 ? snapshotSize : keySize();
+  }
+
   @Override
+  /*
+   * Scanners are ordered from 0 (oldest) to newest in increasing order.
+   */
   protected List<SegmentScanner> getListOfScanners(long readPt) throws IOException {
     List<SegmentScanner> list = new ArrayList<SegmentScanner>(2);
-    list.add(0, getActive().getSegmentScanner(readPt));
-    list.add(1, getSnapshot().getSegmentScanner(readPt));
+    list.add(0, getActive().getSegmentScanner(readPt, 1));
+    list.add(1, getSnapshot().getSegmentScanner(readPt, 0));
     return list;
   }
 
   @Override
-  protected List<Segment> getListOfSegments() throws IOException {
+  protected List<Segment> getSegments() throws IOException {
     List<Segment> list = new ArrayList<Segment>(2);
     list.add(0, getActive());
     list.add(1, getSnapshot());
@@ -126,7 +139,7 @@ public class DefaultMemStore extends AbstractMemStore {
         getNextRow(cell, getSnapshot().getCellSet()));
   }
 
-  @Override public void updateLowestUnflushedSequenceIdInWal(boolean onlyIfMoreRecent) {
+  @Override public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent) {
   }
 
   /**
@@ -150,6 +163,10 @@ public class DefaultMemStore extends AbstractMemStore {
   public void finalizeFlush() {
   }
 
+  @Override public boolean isSloppy() {
+    return false;
+  }
+
   /**
    * Code to help figure if our approximation of object heap sizes is close
    * enough.  See hbase-900.  Fills memstores then waits so user can heap

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java
new file mode 100644
index 0000000..362d0f9
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A {@link FlushPolicy} that only flushes store larger a given threshold. If no store is large
+ * enough, then all stores will be flushed.
+ */
+public class FlushAllLargeStoresPolicy extends FlushLargeStoresPolicy{
+
+  private static final Log LOG = LogFactory.getLog(FlushAllLargeStoresPolicy.class);
+
+  @Override
+  protected void configureForRegion(HRegion region) {
+    super.configureForRegion(region);
+    int familyNumber = region.getTableDesc().getFamilies().size();
+    if (familyNumber <= 1) {
+      // No need to parse and set flush size lower bound if only one family
+      // Family number might also be zero in some of our unit test case
+      return;
+    }
+    this.flushSizeLowerBound = getFlushSizeLowerBound(region);
+  }
+
+  @Override
+  public Collection<Store> selectStoresToFlush() {
+    // no need to select stores if only one family
+    if (region.getTableDesc().getFamilies().size() == 1) {
+      return region.stores.values();
+    }
+    // start selection
+    Collection<Store> stores = region.stores.values();
+    Set<Store> specificStoresToFlush = new HashSet<Store>();
+    for (Store store : stores) {
+      if (shouldFlush(store)) {
+        specificStoresToFlush.add(store);
+      }
+    }
+    if (!specificStoresToFlush.isEmpty()) return specificStoresToFlush;
+
+    // Didn't find any CFs which were above the threshold for selection.
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Since none of the CFs were above the size, flushing all.");
+    }
+    return stores;
+  }
+
+  @Override
+  protected boolean shouldFlush(Store store) {
+    return (super.shouldFlush(store) || region.shouldFlushStore(store));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
index b4d47c7..49cb747 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
@@ -17,10 +17,6 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -31,7 +27,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
  * enough, then all stores will be flushed.
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class FlushLargeStoresPolicy extends FlushPolicy {
+public abstract class FlushLargeStoresPolicy extends FlushPolicy {
 
   private static final Log LOG = LogFactory.getLog(FlushLargeStoresPolicy.class);
 
@@ -41,20 +37,13 @@ public class FlushLargeStoresPolicy extends FlushPolicy {
   public static final String HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN =
       "hbase.hregion.percolumnfamilyflush.size.lower.bound.min";
 
-  private static final long DEFAULT_HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN =
+  public static final long DEFAULT_HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN =
       1024 * 1024 * 16L;
 
-  private long flushSizeLowerBound = -1;
+  protected long flushSizeLowerBound = -1;
 
-  @Override
-  protected void configureForRegion(HRegion region) {
-    super.configureForRegion(region);
+  protected long getFlushSizeLowerBound(HRegion region) {
     int familyNumber = region.getTableDesc().getFamilies().size();
-    if (familyNumber <= 1) {
-      // No need to parse and set flush size lower bound if only one family
-      // Family number might also be zero in some of our unit test case
-      return;
-    }
     // For multiple families, lower bound is the "average flush size" by default
     // unless setting in configuration is larger.
     long flushSizeLowerBound = region.getMemstoreFlushSize() / familyNumber;
@@ -85,44 +74,19 @@ public class FlushLargeStoresPolicy extends FlushPolicy {
 
       }
     }
-    this.flushSizeLowerBound = flushSizeLowerBound;
+    return flushSizeLowerBound;
   }
 
-  private boolean shouldFlush(Store store) {
+  protected boolean shouldFlush(Store store) {
     if (store.getMemStoreSize() > this.flushSizeLowerBound) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Flush Column Family " + store.getColumnFamilyName() + " of " +
-          region.getRegionInfo().getEncodedName() + " because memstoreSize=" +
-          store.getMemStoreSize() + " > lower bound=" + this.flushSizeLowerBound);
+            region.getRegionInfo().getEncodedName() + " because memstoreSize=" +
+            store.getMemStoreSize() + " > lower bound=" + this.flushSizeLowerBound);
       }
       return true;
     }
-    return region.shouldFlushStore(store);
-  }
-
-  @Override
-  public Collection<Store> selectStoresToFlush() {
-    // no need to select stores if only one family
-    if (region.getTableDesc().getFamilies().size() == 1) {
-      return region.stores.values();
-    }
-    // start selection
-    Collection<Store> stores = region.stores.values();
-    Set<Store> specificStoresToFlush = new HashSet<Store>();
-    for (Store store : stores) {
-      if (shouldFlush(store)) {
-        specificStoresToFlush.add(store);
-      }
-    }
-    // Didn't find any CFs which were above the threshold for selection.
-    if (specificStoresToFlush.isEmpty()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Since none of the CFs were above the size, flushing all.");
-      }
-      return stores;
-    } else {
-      return specificStoresToFlush;
-    }
+    return false;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java
new file mode 100644
index 0000000..2921f23
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Collection;
+import java.util.HashSet;
+
+/**
+ * A {@link FlushPolicy} that only flushes store larger than a given threshold. If no store is large
+ * enough, then all stores will be flushed.
+ * Gives priority to selecting regular stores first, and only if no other
+ * option, selects sloppy stores which normaly require more memory.
+ */
+public class FlushNonSloppyStoresFirstPolicy extends FlushLargeStoresPolicy {
+
+  private Collection<Store> regularStores = new HashSet<>();
+  private Collection<Store> sloppyStores = new HashSet<>();
+
+  /**
+   * @return the stores need to be flushed.
+   */
+  @Override public Collection<Store> selectStoresToFlush() {
+    Collection<Store> specificStoresToFlush = new HashSet<Store>();
+    for(Store store : regularStores) {
+      if(shouldFlush(store) || region.shouldFlushStore(store)) {
+        specificStoresToFlush.add(store);
+      }
+    }
+    if(!specificStoresToFlush.isEmpty()) return specificStoresToFlush;
+    for(Store store : sloppyStores) {
+      if(shouldFlush(store)) {
+        specificStoresToFlush.add(store);
+      }
+    }
+    if(!specificStoresToFlush.isEmpty()) return specificStoresToFlush;
+    return region.stores.values();
+  }
+
+  @Override
+  protected void configureForRegion(HRegion region) {
+    super.configureForRegion(region);
+    this.flushSizeLowerBound = getFlushSizeLowerBound(region);
+    for(Store store : region.stores.values()) {
+      if(store.getMemStore().isSloppy()) {
+        sloppyStores.add(store);
+      } else {
+        regularStores.add(store);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicyFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicyFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicyFactory.java
index e80b696..b93594e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicyFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicyFactory.java
@@ -41,7 +41,7 @@ public class FlushPolicyFactory {
   public static final String HBASE_FLUSH_POLICY_KEY = "hbase.regionserver.flush.policy";
 
   private static final Class<? extends FlushPolicy> DEFAULT_FLUSH_POLICY_CLASS =
-      FlushLargeStoresPolicy.class;
+      FlushAllLargeStoresPolicy.class;
 
   /**
    * Create the FlushPolicy configured for the given table.

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
index bfa1f80..8634e37 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@ -514,6 +514,10 @@ public class HMobStore extends HStore {
   @Override public void finalizeFlush() {
   }
 
+  @Override public MemStore getMemStore() {
+    return null;
+  }
+
   public void updateCellsCountCompactedToMob(long count) {
     cellsCountCompactedToMob += count;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index e7a99a9..e5f9d50 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -19,6 +19,19 @@ package org.apache.hadoop.hbase.regionserver;
 
 import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Closeables;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import com.google.protobuf.TextFormat;
 import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -182,19 +195,6 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.io.Closeables;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.Service;
-import com.google.protobuf.TextFormat;
 
 @SuppressWarnings("deprecation")
 @InterfaceAudience.Private
@@ -923,11 +923,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         });
       }
       boolean allStoresOpened = false;
+      boolean hasSloppyStores = false;
       try {
         for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
           Future<HStore> future = completionService.take();
           HStore store = future.get();
           this.stores.put(store.getFamily().getName(), store);
+          MemStore memStore = store.getMemStore();
+          if(memStore != null && memStore.isSloppy()) {
+            hasSloppyStores = true;
+          }
 
           long storeMaxSequenceId = store.getMaxSequenceId();
           maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
@@ -941,6 +946,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           }
         }
         allStoresOpened = true;
+        if(hasSloppyStores) {
+          htableDescriptor.setFlushPolicyClassName(FlushNonSloppyStoresFirstPolicy.class
+              .getName());
+          LOG.info("Setting FlushNonSloppyStoresFirstPolicy for the region=" + this);
+        }
       } catch (InterruptedException e) {
         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
       } catch (ExecutionException e) {
@@ -1457,22 +1467,30 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       LOG.debug("Updates disabled for region " + this);
       // Don't flush the cache if we are aborting
       if (!abort && canFlush) {
+        int failedfFlushCount = 0;
         int flushCount = 0;
-        while (this.memstoreSize.get() > 0) {
+        long tmp = 0;
+        long remainingSize = this.memstoreSize.get();
+        while (remainingSize > 0) {
           try {
-            if (flushCount++ > 0) {
-              int actualFlushes = flushCount - 1;
-              if (actualFlushes > 5) {
-                // If we tried 5 times and are unable to clear memory, abort
-                // so we do not lose data
-                throw new DroppedSnapshotException("Failed clearing memory after " +
-                  actualFlushes + " attempts on region: " +
-                    Bytes.toStringBinary(getRegionInfo().getRegionName()));
-              }
-              LOG.info("Running extra flush, " + actualFlushes +
-                " (carrying snapshot?) " + this);
-            }
             internalFlushcache(status);
+            if(flushCount >0) {
+              LOG.info("Running extra flush, " + flushCount +
+                  " (carrying snapshot?) " + this);
+            }
+            flushCount++;
+            tmp = this.memstoreSize.get();
+            if (tmp >= remainingSize) {
+              failedfFlushCount++;
+            }
+            remainingSize = tmp;
+            if (failedfFlushCount > 5) {
+              // If we failed 5 times and are unable to clear memory, abort
+              // so we do not lose data
+              throw new DroppedSnapshotException("Failed clearing memory after " +
+                  flushCount + " attempts on region: " +
+                  Bytes.toStringBinary(getRegionInfo().getRegionName()));
+            }
           } catch (IOException ioe) {
             status.setStatus("Failed flush " + this + ", putting online again");
             synchronized (writestate) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 8dfa0e0..2d1b9a6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -230,8 +230,15 @@ public class HStore implements Store {
     // to clone it?
     scanInfo = new ScanInfo(conf, family, ttl, timeToPurgeDeletes, this.comparator);
     String className = conf.get(MEMSTORE_CLASS_NAME, DefaultMemStore.class.getName());
-    this.memstore = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {
-        Configuration.class, CellComparator.class }, new Object[] { conf, this.comparator });
+    if (family.isInMemoryCompaction()) {
+      className = CompactingMemStore.class.getName();
+      this.memstore = new CompactingMemStore(conf, this.comparator, this,
+          this.getHRegion().getRegionServicesForStores());
+    } else {
+      this.memstore = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {
+          Configuration.class, CellComparator.class }, new Object[] { conf, this.comparator });
+    }
+    LOG.info("Memstore class name is " + className);
     this.offPeakHours = OffPeakHours.getInstance(conf);
 
     // Setting up cache configuration for this family
@@ -2149,7 +2156,7 @@ public class HStore implements Store {
     @Override
     public void prepare() {
       // passing the current sequence number of the wal - to allow bookkeeping in the memstore
-      this.snapshot = memstore.snapshot(cacheFlushSeqNum);
+      this.snapshot = memstore.snapshot();
       this.cacheFlushCount = snapshot.getCellsCount();
       this.cacheFlushSize = snapshot.getSize();
       committedFiles = new ArrayList<Path>(1);
@@ -2476,6 +2483,10 @@ public class HStore implements Store {
     memstore.finalizeFlush();
   }
 
+  @Override public MemStore getMemStore() {
+    return memstore;
+  }
+
   private void clearCompactedfiles(final List<StoreFile> filesToRemove) throws IOException {
     if (LOG.isTraceEnabled()) {
       LOG.trace("Clearing the compacted file " + filesToRemove + " from this store");

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
index 70a608d..13d9fbf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
@@ -64,8 +64,4 @@ public class ImmutableSegment extends Segment {
     return this.timeRange.getMin();
   }
 
-  @Override
-  protected void updateMetaInfo(Cell toAdd, long s) {
-    throw new IllegalAccessError("This is an immutable segment");
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
index ea72b7f..00d49d1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
@@ -42,19 +42,10 @@ public interface MemStore extends HeapSize {
   MemStoreSnapshot snapshot();
 
   /**
-   * Creates a snapshot of the current memstore. Snapshot must be cleared by call to
-   * {@link #clearSnapshot(long)}.
-   * @param flushOpSeqId the current sequence number of the wal; to be attached to the flushed
-   *                     segment
-   * @return {@link MemStoreSnapshot}
-   */
-  MemStoreSnapshot snapshot(long flushOpSeqId);
-
-  /**
    * Clears the current snapshot of the Memstore.
    * @param id
    * @throws UnexpectedStateException
-   * @see #snapshot(long)
+   * @see #snapshot()
    */
   void clearSnapshot(long id) throws UnexpectedStateException;
 
@@ -144,4 +135,7 @@ public interface MemStore extends HeapSize {
    * One example is to update the wal with sequence number that is known only at the store level.
    */
   void finalizeFlush();
+
+  /* Return true if the memstore may need some extra memory space*/
+  boolean isSloppy();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
new file mode 100644
index 0000000..88e067e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
@@ -0,0 +1,197 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Scan;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * The ongoing MemStore Compaction manager, dispatches a solo running compaction
+ * and interrupts the compaction if requested.
+ * The MemStoreScanner is used to traverse the compaction pipeline. The MemStoreScanner
+ * is included in internal store scanner, where all compaction logic is implemented.
+ * Threads safety: It is assumed that the compaction pipeline is immutable,
+ * therefore no special synchronization is required.
+ */
+class MemStoreCompactor {
+
+  private static final Log LOG = LogFactory.getLog(MemStoreCompactor.class);
+  private CompactingMemStore compactingMemStore;
+  private MemStoreScanner scanner;            // scanner for pipeline only
+  // scanner on top of MemStoreScanner that uses ScanQueryMatcher
+  private StoreScanner compactingScanner;
+
+  // smallest read point for any ongoing MemStore scan
+  private long smallestReadPoint;
+
+  // a static version of the segment list from the pipeline
+  private VersionedSegmentsList versionedList;
+  private final AtomicBoolean isInterrupted = new AtomicBoolean(false);
+
+  public MemStoreCompactor(CompactingMemStore compactingMemStore) {
+    this.compactingMemStore = compactingMemStore;
+  }
+
+  /**
+   * The request to dispatch the compaction asynchronous task.
+   * The method returns true if compaction was successfully dispatched, or false if there
+   * is already an ongoing compaction or nothing to compact.
+   */
+  public boolean startCompaction() throws IOException {
+    if (!compactingMemStore.hasCompactibleSegments()) return false;  // no compaction on empty
+
+    List<SegmentScanner> scanners = new ArrayList<SegmentScanner>();
+    // get the list of segments from the pipeline
+    versionedList = compactingMemStore.getCompactibleSegments();
+    // the list is marked with specific version
+
+    // create the list of scanners with maximally possible read point, meaning that
+    // all KVs are going to be returned by the pipeline traversing
+    for (Segment segment : versionedList.getStoreSegments()) {
+      scanners.add(segment.getSegmentScanner(Long.MAX_VALUE));
+    }
+    scanner =
+        new MemStoreScanner(compactingMemStore, scanners, Long.MAX_VALUE,
+            MemStoreScanner.Type.COMPACT_FORWARD);
+
+    smallestReadPoint = compactingMemStore.getSmallestReadPoint();
+    compactingScanner = createScanner(compactingMemStore.getStore());
+
+    LOG.info("Starting the MemStore in-memory compaction for store " +
+        compactingMemStore.getStore().getColumnFamilyName());
+
+    doCompaction();
+    return true;
+  }
+
+  /**
+  * The request to cancel the compaction asynchronous task
+  * The compaction may still happen if the request was sent too late
+  * Non-blocking request
+  */
+  public void stopCompact() {
+    isInterrupted.compareAndSet(false, true);
+  }
+
+
+  /**
+  * Close the scanners and clear the pointers in order to allow good
+  * garbage collection
+  */
+  private void releaseResources() {
+    isInterrupted.set(false);
+    scanner.close();
+    scanner = null;
+    compactingScanner.close();
+    compactingScanner = null;
+    versionedList = null;
+  }
+
+  /**
+  * The worker thread performs the compaction asynchronously.
+  * The solo (per compactor) thread only reads the compaction pipeline.
+  * There is at most one thread per memstore instance.
+  */
+  private void doCompaction() {
+
+    ImmutableSegment result = SegmentFactory.instance()  // create the scanner
+        .createImmutableSegment(
+            compactingMemStore.getConfiguration(), compactingMemStore.getComparator(),
+            CompactingMemStore.DEEP_OVERHEAD_PER_PIPELINE_ITEM);
+
+    // the compaction processing
+    try {
+      // Phase I: create the compacted MutableCellSetSegment
+      compactSegments(result);
+
+      // Phase II: swap the old compaction pipeline
+      if (!isInterrupted.get()) {
+        compactingMemStore.swapCompactedSegments(versionedList, result);
+        // update the wal so it can be truncated and not get too long
+        compactingMemStore.updateLowestUnflushedSequenceIdInWAL(true); // only if greater
+      }
+    } catch (Exception e) {
+      LOG.debug("Interrupting the MemStore in-memory compaction for store " + compactingMemStore
+          .getFamilyName());
+      Thread.currentThread().interrupt();
+      return;
+    } finally {
+      releaseResources();
+      compactingMemStore.setInMemoryFlushInProgress(false);
+    }
+
+  }
+
+  /**
+   * Creates the scanner for compacting the pipeline.
+   *
+   * @return the scanner
+   */
+  private StoreScanner createScanner(Store store) throws IOException {
+
+    Scan scan = new Scan();
+    scan.setMaxVersions();  //Get all available versions
+
+    StoreScanner internalScanner =
+        new StoreScanner(store, store.getScanInfo(), scan, Collections.singletonList(scanner),
+            ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
+
+    return internalScanner;
+  }
+
+  /**
+   * Updates the given single Segment using the internal store scanner,
+   * who in turn uses ScanQueryMatcher
+   */
+  private void compactSegments(Segment result) throws IOException {
+
+    List<Cell> kvs = new ArrayList<Cell>();
+    // get the limit to the size of the groups to be returned by compactingScanner
+    int compactionKVMax = compactingMemStore.getConfiguration().getInt(
+        HConstants.COMPACTION_KV_MAX,
+        HConstants.COMPACTION_KV_MAX_DEFAULT);
+
+    ScannerContext scannerContext =
+        ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
+
+    boolean hasMore;
+    do {
+      hasMore = compactingScanner.next(kvs, scannerContext);
+      if (!kvs.isEmpty()) {
+        for (Cell c : kvs) {
+          // The scanner is doing all the elimination logic
+          // now we just copy it to the new segment
+          Cell newKV = result.maybeCloneWithAllocator(c);
+          result.internalAdd(newKV);
+
+        }
+        kvs.clear();
+      }
+    } while (hasMore && (!isInterrupted.get()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
index d3c35b3..72f7bf5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
@@ -18,8 +18,15 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.StealJobQueue;
+import org.apache.hadoop.hbase.wal.WAL;
 
 /**
  * Services a Store needs from a Region.
@@ -32,6 +39,20 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 @InterfaceStability.Evolving
 public class RegionServicesForStores {
 
+  private static final int POOL_SIZE = 10;
+  private static final ThreadPoolExecutor INMEMORY_COMPACTION_POOL =
+      new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 60, TimeUnit.SECONDS,
+          new StealJobQueue<Runnable>().getStealFromQueue(),
+          new ThreadFactory() {
+            @Override
+            public Thread newThread(Runnable r) {
+              Thread t = new Thread(r);
+              t.setName(Thread.currentThread().getName()
+                  + "-inmemoryCompactions-"
+                  + System.currentTimeMillis());
+              return t;
+            }
+          });
   private final HRegion region;
 
   public RegionServicesForStores(HRegion region) {
@@ -39,15 +60,37 @@ public class RegionServicesForStores {
   }
 
   public void blockUpdates() {
-    this.region.blockUpdates();
+    region.blockUpdates();
   }
 
   public void unblockUpdates() {
-    this.region.unblockUpdates();
+    region.unblockUpdates();
   }
 
   public long addAndGetGlobalMemstoreSize(long size) {
-    return this.region.addAndGetGlobalMemstoreSize(size);
+    return region.addAndGetGlobalMemstoreSize(size);
+  }
+
+  public HRegionInfo getRegionInfo() {
+    return region.getRegionInfo();
   }
 
+  public WAL getWAL() {
+    return region.getWAL();
+  }
+
+  public ThreadPoolExecutor getInMemoryCompactionPool() { return INMEMORY_COMPACTION_POOL; }
+
+  public long getMemstoreFlushSize() {
+    return region.getMemstoreFlushSize();
+  }
+
+  public int getNumStores() {
+    return region.getStores().size();
+  }
+
+  // methods for tests
+  long getGlobalMemstoreTotalSize() {
+    return region.getMemstoreSize();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
index dcad5a0..6435232 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
@@ -23,6 +23,7 @@ import java.util.SortedSet;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.KeyValue;
@@ -40,8 +41,11 @@ import org.apache.hadoop.hbase.util.ByteRange;
  */
 @InterfaceAudience.Private
 public abstract class Segment {
+
+  private static final Log LOG = LogFactory.getLog(Segment.class);
   private volatile CellSet cellSet;
   private final CellComparator comparator;
+  private long minSequenceId;
   private volatile MemStoreLAB memStoreLAB;
   protected final AtomicLong size;
   protected volatile boolean tagsPresent;
@@ -51,6 +55,7 @@ public abstract class Segment {
       long size) {
     this.cellSet = cellSet;
     this.comparator = comparator;
+    this.minSequenceId = Long.MAX_VALUE;
     this.memStoreLAB = memStoreLAB;
     this.size = new AtomicLong(size);
     this.tagsPresent = false;
@@ -60,6 +65,7 @@ public abstract class Segment {
   protected Segment(Segment segment) {
     this.cellSet = segment.getCellSet();
     this.comparator = segment.getComparator();
+    this.minSequenceId = segment.getMinSequenceId();
     this.memStoreLAB = segment.getMemStoreLAB();
     this.size = new AtomicLong(segment.getSize());
     this.tagsPresent = segment.isTagsPresent();
@@ -75,6 +81,14 @@ public abstract class Segment {
   }
 
   /**
+   * Creates the scanner for the given read point, and a specific order in a list
+   * @return a scanner for the given read point
+   */
+  public SegmentScanner getSegmentScanner(long readPoint, long order) {
+    return new SegmentScanner(this, readPoint, order);
+  }
+
+  /**
    * Returns whether the segment has any cells
    * @return whether the segment has any cells
    */
@@ -183,6 +197,10 @@ public abstract class Segment {
     size.addAndGet(delta);
   }
 
+  public long getMinSequenceId() {
+    return minSequenceId;
+  }
+
   public TimeRangeTracker getTimeRangeTracker() {
     return this.timeRangeTracker;
   }
@@ -231,10 +249,18 @@ public abstract class Segment {
     return s;
   }
 
-  /**
-   * Only mutable Segments implement this.
-   */
-  protected abstract void updateMetaInfo(Cell toAdd, long s);
+  protected void updateMetaInfo(Cell toAdd, long s) {
+    getTimeRangeTracker().includeTimestamp(toAdd);
+    size.addAndGet(s);
+    minSequenceId = Math.min(minSequenceId, toAdd.getSequenceId());
+    // In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call.
+    // When we use ACL CP or Visibility CP which deals with Tags during
+    // mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not
+    // parse the byte[] to identify the tags length.
+    if(toAdd.getTagsLength() > 0) {
+      tagsPresent = true;
+    }
+  }
 
   /**
    * Returns a subset of the segment cell set, which starts with the given cell

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
index 394ffa1..7ac80ae 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
@@ -44,16 +44,16 @@ public final class SegmentFactory {
       final CellComparator comparator, long size) {
     MemStoreLAB memStoreLAB = getMemStoreLAB(conf);
     MutableSegment segment = generateMutableSegment(conf, comparator, memStoreLAB, size);
-    return createImmutableSegment(conf, segment);
+    return createImmutableSegment(segment);
   }
 
   public ImmutableSegment createImmutableSegment(CellComparator comparator,
       long size) {
     MutableSegment segment = generateMutableSegment(null, comparator, null, size);
-    return createImmutableSegment(null, segment);
+    return createImmutableSegment(segment);
   }
 
-  public ImmutableSegment createImmutableSegment(final Configuration conf, MutableSegment segment) {
+  public ImmutableSegment createImmutableSegment(MutableSegment segment) {
     return new ImmutableSegment(segment);
   }
   public MutableSegment createMutableSegment(final Configuration conf,

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
index 45f72d83..a04c1da 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
@@ -62,6 +62,7 @@ public class SegmentScanner implements KeyValueScanner {
 
   /**
    * @param scannerOrder see {@link KeyValueScanner#getScannerOrder()}.
+   * Scanners are ordered from 0 (oldest) to newest in increasing order.
    */
   protected SegmentScanner(Segment segment, long readPoint, long scannerOrder) {
     this.segment = segment;
@@ -84,7 +85,6 @@ public class SegmentScanner implements KeyValueScanner {
       throw new RuntimeException("current is invalid: read point is "+readPoint+", " +
           "while current sequence id is " +current.getSequenceId());
     }
-
     return current;
   }
 
@@ -172,9 +172,8 @@ public class SegmentScanner implements KeyValueScanner {
    */
   @Override
   public boolean seekToPreviousRow(Cell cell) throws IOException {
-    boolean keepSeeking = false;
+    boolean keepSeeking;
     Cell key = cell;
-
     do {
       Cell firstKeyOnRow = CellUtil.createFirstOnRow(key);
       SortedSet<Cell> cellHead = segment.headSet(firstKeyOnRow);

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index b77a33b..3419937 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -520,4 +520,5 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
    */
   void finalizeFlush();
 
+  MemStore getMemStore();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java
new file mode 100644
index 0000000..9d7a723
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java
@@ -0,0 +1,54 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.LinkedList;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A list of segment managers coupled with the version of the memstore (version at the time it was
+ * created).
+ * This structure helps to guarantee that the compaction pipeline updates after the compaction is
+ * updated in a consistent (atomic) way.
+ * Specifically, swapping some of the elements in a compaction pipeline with a new compacted
+ * element is permitted only if the pipeline version is the same as the version attached to the
+ * elements.
+ *
+ */
+@InterfaceAudience.Private
+public class VersionedSegmentsList {
+
+  private final LinkedList<ImmutableSegment> storeSegments;
+  private final long version;
+
+  public VersionedSegmentsList(
+          LinkedList<ImmutableSegment> storeSegments, long version) {
+    this.storeSegments = storeSegments;
+    this.version = version;
+  }
+
+  public LinkedList<ImmutableSegment> getStoreSegments() {
+    return storeSegments;
+  }
+
+  public long getVersion() {
+    return version;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a27504c7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index ae48f6c..3aafc23 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.regionserver.wal;
 
 import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.lang.management.ManagementFactory;
@@ -41,8 +43,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 
-import com.google.common.annotations.VisibleForTesting;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -772,6 +772,21 @@ public abstract class AbstractFSWAL<W> implements WAL {
     LOG.info("Closed WAL: " + toString());
   }
 
+  /**
+   * updates the sequence number of a specific store.
+   * depending on the flag: replaces current seq number if the given seq id is bigger,
+   * or even if it is lower than existing one
+   *  @param encodedRegionName
+   * @param familyName
+   * @param sequenceid
+   * @param onlyIfGreater
+   */
+  @Override public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid,
+      boolean onlyIfGreater) {
+    sequenceIdAccounting.updateStore(encodedRegionName,familyName,sequenceid,onlyIfGreater);
+  }
+
+
   protected SyncFuture getSyncFuture(final long sequence, Span span) {
     SyncFuture syncFuture = this.syncFuturesByHandler.get(Thread.currentThread());
     if (syncFuture == null) {


Mime
View raw message