hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From anasta...@apache.org
Subject [2/2] hbase git commit: HBASE-18010: CellChunkMap integration into CompactingMemStore. CellChunkMap usage is currently switched off by default. New tests are included. Review comments addressed.
Date Wed, 05 Jul 2017 09:35:42 GMT
HBASE-18010: CellChunkMap integration into CompactingMemStore. CellChunkMap usage is currently switched off by default. New tests are included. Review comments addressed.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8ac43084
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8ac43084
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8ac43084

Branch: refs/heads/master
Commit: 8ac4308411df0528a3ef5a3aae0297154940cd9d
Parents: 8b63eb6
Author: anastas <anastas@yahoo-inc.com>
Authored: Wed Jul 5 12:35:21 2017 +0300
Committer: anastas <anastas@yahoo-inc.com>
Committed: Wed Jul 5 12:35:21 2017 +0300

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/util/ClassSize.java |  29 +-
 .../hadoop/hbase/regionserver/CellChunkMap.java |  31 +-
 .../hadoop/hbase/regionserver/ChunkCreator.java |  79 ++-
 .../hbase/regionserver/CompactingMemStore.java  |  38 +-
 .../hbase/regionserver/CompactionPipeline.java  |  28 +-
 .../regionserver/CompositeImmutableSegment.java |  10 +
 .../regionserver/ImmutableMemStoreLAB.java      |   9 +
 .../hbase/regionserver/ImmutableSegment.java    | 184 +------
 .../hbase/regionserver/MemStoreCompactor.java   |  16 +-
 .../hadoop/hbase/regionserver/MemStoreLAB.java  |   6 +
 .../hbase/regionserver/MemStoreLABImpl.java     |  11 +
 .../hbase/regionserver/MutableSegment.java      |   5 +
 .../hadoop/hbase/regionserver/Segment.java      |   8 +-
 .../hbase/regionserver/SegmentFactory.java      |  76 ++-
 .../apache/hadoop/hbase/io/TestHeapSize.java    |  29 +-
 .../hbase/regionserver/TestCellFlatSet.java     |  15 +-
 .../regionserver/TestCompactingMemStore.java    | 134 +++--
 .../TestCompactingToCellArrayMapMemStore.java   | 492 -------------------
 .../hbase/regionserver/TestDefaultMemStore.java |   7 +-
 .../regionserver/TestHRegionReplayEvents.java   |   5 +-
 .../regionserver/TestPerColumnFamilyFlush.java  |  18 +-
 .../hadoop/hbase/regionserver/TestStore.java    |  10 +-
 .../TestWalAndCompactingMemStoreFlush.java      |  28 +-
 23 files changed, 454 insertions(+), 814 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/8ac43084/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
index e064cc0..4990ac0 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
@@ -89,6 +89,15 @@ public class ClassSize {
   /** Overhead for ConcurrentSkipListMap Entry */
   public static final int CONCURRENT_SKIPLISTMAP_ENTRY;
 
+  /** Overhead for CellFlatMap */
+  public static final int CELL_FLAT_MAP;
+
+  /** Overhead for CellChunkMap */
+  public static final int CELL_CHUNK_MAP;
+
+  /** Overhead for Cell Chunk Map Entry */
+  public static final int CELL_CHUNK_MAP_ENTRY;
+
   /** Overhead for CellArrayMap */
   public static final int CELL_ARRAY_MAP;
 
@@ -275,13 +284,17 @@ public class ClassSize {
     // The size changes from jdk7 to jdk8, estimate the size rather than use a conditional
     CONCURRENT_SKIPLISTMAP = (int) estimateBase(ConcurrentSkipListMap.class, false);
 
-    // CELL_ARRAY_MAP is the size of an instance of CellArrayMap class, which extends
-    // CellFlatMap class. CellArrayMap object containing a ref to an Array, so
-    // OBJECT + REFERENCE + ARRAY
     // CellFlatMap object contains two integers, one boolean and one reference to object, so
     // 2*INT + BOOLEAN + REFERENCE
-    CELL_ARRAY_MAP = align(OBJECT + 2*Bytes.SIZEOF_INT + Bytes.SIZEOF_BOOLEAN
-        + ARRAY + 2*REFERENCE);
+    CELL_FLAT_MAP = OBJECT + 2*Bytes.SIZEOF_INT + Bytes.SIZEOF_BOOLEAN + REFERENCE;
+
+    // CELL_ARRAY_MAP is the size of an instance of CellArrayMap class, which extends
+    // CellFlatMap class. CellArrayMap object containing a ref to an Array of Cells
+    CELL_ARRAY_MAP = align(CELL_FLAT_MAP + REFERENCE + ARRAY);
+
+    // CELL_CHUNK_MAP is the size of an instance of CellChunkMap class, which extends
+    // CellFlatMap class. CellChunkMap object containing a ref to an Array of Chunks
+    CELL_CHUNK_MAP = align(CELL_FLAT_MAP + REFERENCE + ARRAY);
 
     CONCURRENT_SKIPLISTMAP_ENTRY = align(
         align(OBJECT + (3 * REFERENCE)) + /* one node per entry */
@@ -290,6 +303,12 @@ public class ClassSize {
     // REFERENCE in the CellArrayMap all the rest is counted in KeyValue.heapSize()
     CELL_ARRAY_MAP_ENTRY = align(REFERENCE);
 
+    // The Cell Representation in the CellChunkMap, the Cell object size shouldn't be counted
+    // in KeyValue.heapSize()
+    // each cell-representation requires three integers for chunkID (reference to the ByteBuffer),
+    // offset and length, and one long for seqID
+    CELL_CHUNK_MAP_ENTRY = 3*Bytes.SIZEOF_INT + Bytes.SIZEOF_LONG;
+
     REENTRANT_LOCK = align(OBJECT + (3 * REFERENCE));
 
     ATOMIC_LONG = align(OBJECT + Bytes.SIZEOF_LONG);

http://git-wip-us.apache.org/repos/asf/hbase/blob/8ac43084/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkMap.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkMap.java
index a965ade..2ab10f9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkMap.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkMap.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.ClassSize;
 
 import java.util.Comparator;
 
@@ -56,11 +57,12 @@ import java.util.Comparator;
 public class CellChunkMap extends CellFlatMap {
 
   private final Chunk[] chunks;             // the array of chunks, on which the index is based
-  private final int numOfCellsInsideChunk;  // constant number of cell-representations in a chunk
 
-  // each cell-representation requires three integers for chunkID (reference to the ByteBuffer),
-  // offset and length, and one long for seqID
-  public static final int SIZEOF_CELL_REP = 3*Bytes.SIZEOF_INT + Bytes.SIZEOF_LONG ;
+  // constant number of cell-representations in a chunk
+  // each chunk starts with its own ID following the cells data
+  public static final int NUM_OF_CELL_REPS_IN_CHUNK =
+      (ChunkCreator.getInstance().getChunkSize() - ChunkCreator.SIZEOF_CHUNK_HEADER) /
+          ClassSize.CELL_CHUNK_MAP_ENTRY;
 
   /**
    * C-tor for creating CellChunkMap from existing Chunk array, which must be ordered
@@ -75,9 +77,6 @@ public class CellChunkMap extends CellFlatMap {
       Chunk[] chunks, int min, int max, boolean descending) {
     super(comparator, min, max, descending);
     this.chunks = chunks;
-    this.numOfCellsInsideChunk = // each chunk starts with its own ID following the cells data
-        (ChunkCreator.getInstance().getChunkSize() - Bytes.SIZEOF_INT) / SIZEOF_CELL_REP;
-
   }
 
   /* To be used by base (CellFlatMap) class only to create a sub-CellFlatMap
@@ -91,20 +90,20 @@ public class CellChunkMap extends CellFlatMap {
   @Override
   protected Cell getCell(int i) {
     // get the index of the relevant chunk inside chunk array
-    int chunkIndex = (i / numOfCellsInsideChunk);
+    int chunkIndex = (i / NUM_OF_CELL_REPS_IN_CHUNK);
     ByteBuffer block = chunks[chunkIndex].getData();// get the ByteBuffer of the relevant chunk
-    int j = i - chunkIndex * numOfCellsInsideChunk; // get the index of the cell-representation
+    int j = i - chunkIndex * NUM_OF_CELL_REPS_IN_CHUNK; // get the index of the cell-representation
 
     // find inside the offset inside the chunk holding the index, skip bytes for chunk id
-    int offsetInBytes = Bytes.SIZEOF_INT + j* SIZEOF_CELL_REP;
-
+    int offsetInBytes = ChunkCreator.SIZEOF_CHUNK_HEADER + j* ClassSize.CELL_CHUNK_MAP_ENTRY;
 
     // find the chunk holding the data of the cell, the chunkID is stored first
     int chunkId = ByteBufferUtils.toInt(block, offsetInBytes);
     Chunk chunk = ChunkCreator.getInstance().getChunk(chunkId);
     if (chunk == null) {
-      // this should not happen, putting an assertion here at least for the testing period
-      assert false;
+      // this should not happen
+      throw new IllegalArgumentException("In CellChunkMap, cell must be associated with chunk."
+          + ". We were looking for a cell at index " + i);
     }
 
     // find the offset of the data of the cell, skip integer for chunkID, offset is stored second
@@ -118,8 +117,10 @@ public class CellChunkMap extends CellFlatMap {
 
     ByteBuffer buf = chunk.getData();   // get the ByteBuffer where the cell data is stored
     if (buf == null) {
-      // this should not happen, putting an assertion here at least for the testing period
-      assert false;
+      // this should not happen
+      throw new IllegalArgumentException("In CellChunkMap, chunk must be associated with ByteBuffer."
+          + " Chunk: " + chunk + " Chunk ID: " + chunk.getId() + ", is from pool: "
+          + chunk.isFromPool() + ". We were looking for a cell at index " + i);
     }
 
     return new ByteBufferChunkCell(buf, offsetOfCell, lengthOfCell, cellSeqID);

http://git-wip-us.apache.org/repos/asf/hbase/blob/8ac43084/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
index d550148..baf9a7b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
@@ -18,7 +18,7 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import java.lang.ref.SoftReference;
+import java.lang.ref.WeakReference;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
@@ -35,6 +35,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -51,9 +52,28 @@ public class ChunkCreator {
   private AtomicInteger chunkID = new AtomicInteger(1);
   // maps the chunk against the monotonically increasing chunk id. We need to preserve the
   // natural ordering of the key
-  // CellChunkMap creation should convert the soft ref to hard reference
-  private Map<Integer, SoftReference<Chunk>> chunkIdMap =
-      new ConcurrentHashMap<Integer, SoftReference<Chunk>>();
+  // CellChunkMap creation should convert the weak ref to hard reference
+
+  // chunk id of each chunk is the first integer written on each chunk,
+  // the header size need to be changed in case chunk id size is changed
+  public static final int SIZEOF_CHUNK_HEADER = Bytes.SIZEOF_INT;
+
+  // An object pointed by a weak reference can be garbage collected, in opposite to an object
+  // referenced by a strong (regular) reference. Every chunk created via ChunkCreator is referenced
+  // from either weakChunkIdMap or strongChunkIdMap.
+  // Upon chunk C creation, C's ID is mapped into weak reference to C, in order not to disturb C's
+  // GC in case all other reference to C are going to be removed.
+  // When chunk C is referenced from CellChunkMap (via C's ID) it is possible to GC the chunk C.
+  // To avoid that upon inserting C into CellChunkMap, C's ID is mapped into strong (regular)
+  // reference to C.
+
+  // map that doesn't influence GC
+  private Map<Integer, WeakReference<Chunk>> weakChunkIdMap =
+      new ConcurrentHashMap<Integer, WeakReference<Chunk>>();
+
+  // map that keeps chunks from garbage collection
+  private Map<Integer, Chunk> strongChunkIdMap = new ConcurrentHashMap<Integer, Chunk>();
+
   private final int chunkSize;
   private final boolean offheap;
   @VisibleForTesting
@@ -119,8 +139,8 @@ public class ChunkCreator {
     if (chunk == null) {
       chunk = createChunk();
     }
-    // put this chunk into the chunkIdMap
-    this.chunkIdMap.put(chunk.getId(), new SoftReference<>(chunk));
+    // put this chunk initially into the weakChunkIdMap
+    this.weakChunkIdMap.put(chunk.getId(), new WeakReference<>(chunk));
     // now we need to actually do the expensive memory allocation step in case of a new chunk,
     // else only the offset is set to the beginning of the chunk to accept allocations
     chunk.init();
@@ -148,12 +168,36 @@ public class ChunkCreator {
   }
 
   @VisibleForTesting
-  // TODO : To be used by CellChunkMap
+  // Used to translate the ChunkID into a chunk ref
   Chunk getChunk(int id) {
-    SoftReference<Chunk> ref = chunkIdMap.get(id);
+    WeakReference<Chunk> ref = weakChunkIdMap.get(id);
     if (ref != null) {
       return ref.get();
     }
+    // check also the strong mapping
+    return strongChunkIdMap.get(id);
+  }
+
+  // transfer the weak pointer to be a strong chunk pointer
+  Chunk saveChunkFromGC(int chunkID) {
+    Chunk c = strongChunkIdMap.get(chunkID); // check whether the chunk is already protected
+    if (c != null)                           // with strong pointer
+      return c;
+    WeakReference<Chunk> ref = weakChunkIdMap.get(chunkID);
+    if (ref != null) {
+      c = ref.get();
+    }
+    if (c != null) {
+      // put this strong reference to chunk into the strongChunkIdMap
+      // the read of the weakMap is always happening before the read of the strongMap
+      // so no synchronization issues here
+      this.strongChunkIdMap.put(chunkID, c);
+      this.weakChunkIdMap.remove(chunkID);
+      return c;
+    }
+    // we should actually never return null as someone should not ask to save from GC a chunk,
+    // which is already released. However, we are not asserting it here and we let the caller
+    // to deal with the return value an assert if needed
     return null;
   }
 
@@ -166,25 +210,30 @@ public class ChunkCreator {
   }
 
   private void removeChunks(Set<Integer> chunkIDs) {
-    this.chunkIdMap.keySet().removeAll(chunkIDs);
+    this.weakChunkIdMap.keySet().removeAll(chunkIDs);
+    this.strongChunkIdMap.keySet().removeAll(chunkIDs);
   }
 
   Chunk removeChunk(int chunkId) {
-    SoftReference<Chunk> ref = this.chunkIdMap.remove(chunkId);
-    if (ref != null) {
-      return ref.get();
+    WeakReference<Chunk> weak = this.weakChunkIdMap.remove(chunkId);
+    Chunk strong = this.strongChunkIdMap.remove(chunkId);
+    if (weak != null) {
+      return weak.get();
     }
-    return null;
+    return strong;
   }
 
   @VisibleForTesting
+  // the chunks in the weakChunkIdMap may already be released so we shouldn't relay
+  // on this counting for strong correctness. This method is used only in testing.
   int size() {
-    return this.chunkIdMap.size();
+    return this.weakChunkIdMap.size()+this.strongChunkIdMap.size();
   }
 
   @VisibleForTesting
   void clearChunkIds() {
-    this.chunkIdMap.clear();
+    this.strongChunkIdMap.clear();
+    this.weakChunkIdMap.clear();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/8ac43084/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index 5b9372a..4a53c77 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -57,6 +57,12 @@ public class CompactingMemStore extends AbstractMemStore {
       "hbase.hregion.compacting.memstore.type";
   public static final String COMPACTING_MEMSTORE_TYPE_DEFAULT =
       String.valueOf(MemoryCompactionPolicy.BASIC);
+  // The external setting of the compacting MemStore behaviour
+  public static final String COMPACTING_MEMSTORE_INDEX_KEY =
+      "hbase.hregion.compacting.memstore.index";
+  // usage of CellArrayMap is default, later it will be decided how to use CellChunkMap
+  public static final String COMPACTING_MEMSTORE_INDEX_DEFAULT =
+      String.valueOf(IndexType.ARRAY_MAP);
   // Default fraction of in-memory-flush size w.r.t. flush-to-disk size
   public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY =
       "hbase.memstore.inmemoryflush.threshold.factor";
@@ -78,10 +84,22 @@ public class CompactingMemStore extends AbstractMemStore {
   private final AtomicBoolean allowCompaction = new AtomicBoolean(true);
   private boolean compositeSnapshot = true;
 
+  /**
+   * Types of indexes (part of immutable segments) to be used after flattening,
+   * compaction, or merge are applied.
+   */
+  public enum IndexType {
+    CSLM_MAP,   // ConcurrentSkipLisMap
+    ARRAY_MAP,  // CellArrayMap
+    CHUNK_MAP   // CellChunkMap
+  }
+
+  private IndexType indexType = IndexType.ARRAY_MAP;  // default implementation
 
   public static final long DEEP_OVERHEAD = ClassSize.align( AbstractMemStore.DEEP_OVERHEAD
-      + 6 * ClassSize.REFERENCE     // Store, RegionServicesForStores, CompactionPipeline,
-                                    // MemStoreCompactor, inMemoryFlushInProgress, allowCompaction
+      + 7 * ClassSize.REFERENCE     // Store, RegionServicesForStores, CompactionPipeline,
+                                    // MemStoreCompactor, inMemoryFlushInProgress, allowCompaction,
+                                    // indexType
       + Bytes.SIZEOF_LONG           // inmemoryFlushSize
       + 2 * Bytes.SIZEOF_BOOLEAN    // compositeSnapshot and inWalReplay
       + 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryFlushInProgress and allowCompaction
@@ -96,6 +114,8 @@ public class CompactingMemStore extends AbstractMemStore {
     this.pipeline = new CompactionPipeline(getRegionServices());
     this.compactor = createMemStoreCompactor(compactionPolicy);
     initInmemoryFlushSize(conf);
+    indexType = IndexType.valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
+        CompactingMemStore.COMPACTING_MEMSTORE_INDEX_DEFAULT));
   }
 
   @VisibleForTesting
@@ -294,7 +314,19 @@ public class CompactingMemStore extends AbstractMemStore {
    *           The flattening happens only if versions match.
    */
   public void flattenOneSegment(long requesterVersion) {
-    pipeline.flattenYoungestSegment(requesterVersion);
+    pipeline.flattenOneSegment(requesterVersion, indexType);
+  }
+
+  // setter is used only for testability
+  @VisibleForTesting
+  public void setIndexType() {
+    indexType = IndexType.valueOf(getConfiguration().get(
+        CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
+        CompactingMemStore.COMPACTING_MEMSTORE_INDEX_DEFAULT));
+  }
+
+  public IndexType getIndexType() {
+    return indexType;
   }
 
   public boolean hasImmutableSegments() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8ac43084/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index 06e83a3..5136f24 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.util.ClassSize;
  * method accesses the read-only copy more than once it makes a local copy of it
  * to ensure it accesses the same copy.
  *
- * The methods getVersionedList(), getVersionedTail(), and flattenYoungestSegment() are also
+ * The methods getVersionedList(), getVersionedTail(), and flattenOneSegment() are also
  * protected by a lock since they need to have a consistent (atomic) view of the pipeline list
  * and version number.
  */
@@ -183,7 +183,7 @@ public class CompactionPipeline {
    *
    * @return true iff a segment was successfully flattened
    */
-  public boolean flattenYoungestSegment(long requesterVersion) {
+  public boolean flattenOneSegment(long requesterVersion, CompactingMemStore.IndexType idxType) {
 
     if(requesterVersion != version) {
       LOG.warn("Segment flattening failed, because versions do not match. Requester version: "
@@ -196,17 +196,22 @@ public class CompactionPipeline {
         LOG.warn("Segment flattening failed, because versions do not match");
         return false;
       }
-
+      int i = 0;
       for (ImmutableSegment s : pipeline) {
-        // remember the old size in case this segment is going to be flatten
-        MemstoreSize memstoreSize = new MemstoreSize();
-        if (s.flatten(memstoreSize)) {
+        if ( s.canBeFlattened() ) {
+          MemstoreSize newMemstoreSize = new MemstoreSize(); // the size to be updated
+          ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening(
+              (CSLMImmutableSegment)s,idxType,newMemstoreSize);
+          replaceAtIndex(i,newS);
           if(region != null) {
-            region.addMemstoreSize(memstoreSize);
+            // update the global memstore size counter
+            // upon flattening there is no change in the data size
+            region.addMemstoreSize(new MemstoreSize(0, newMemstoreSize.getHeapSize()));
           }
           LOG.debug("Compaction pipeline segment " + s + " was flattened");
           return true;
         }
+        i++;
       }
 
     }
@@ -271,12 +276,19 @@ public class CompactionPipeline {
     if(segment != null) pipeline.addLast(segment);
   }
 
+  // replacing one segment in the pipeline with a new one exactly at the same index
+  // need to be called only within synchronized block
+  private void replaceAtIndex(int idx, ImmutableSegment newSegment) {
+    pipeline.set(idx, newSegment);
+    readOnlyCopy = new LinkedList<>(pipeline);
+  }
+
   public Segment getTail() {
     List<? extends Segment> localCopy = getSegments();
     if(localCopy.isEmpty()) {
       return null;
     }
-    return localCopy.get(localCopy.size()-1);
+    return localCopy.get(localCopy.size() - 1);
   }
 
   private boolean addFirst(ImmutableSegment segment) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8ac43084/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
index 2f89ec7..86e4d0a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
@@ -184,6 +184,16 @@ public class CompositeImmutableSegment extends ImmutableSegment {
     throw new IllegalStateException("Not supported by CompositeImmutableScanner");
   }
 
+
+  @Override
+  protected long indexEntrySize() {
+    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+  }
+
+  @Override protected boolean canBeFlattened() {
+    return false;
+  }
+
   /**
    * @return Sum of all cell sizes.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/8ac43084/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java
index 430b642..9bc0fd4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java
@@ -46,6 +46,15 @@ public class ImmutableMemStoreLAB implements MemStoreLAB {
   }
 
   @Override
+  // returning a new chunk, without replacing current chunk,
+  // the space on this chunk will be allocated externally
+  // use the first MemStoreLABImpl in the list
+  public Chunk getNewExternalChunk() {
+    MemStoreLAB mslab = this.mslabs.get(0);
+    return mslab.getNewExternalChunk();
+  }
+
+  @Override
   public void close() {
     // 'openScannerCount' here tracks the scanners opened on segments which directly refer to this
     // MSLAB. The individual MSLABs this refers also having its own 'openScannerCount'. The usage of

http://git-wip-us.apache.org/repos/asf/hbase/blob/8ac43084/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
index 19b66b4..9d53c7c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
@@ -19,15 +19,12 @@
 package org.apache.hadoop.hbase.regionserver;
 
 
-import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.TimeRange;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -37,21 +34,11 @@ import java.util.List;
  * and is not needed for a {@link MutableSegment}.
  */
 @InterfaceAudience.Private
-public class ImmutableSegment extends Segment {
+public abstract class ImmutableSegment extends Segment {
 
-  private static final long DEEP_OVERHEAD = Segment.DEEP_OVERHEAD
-      + (2 * ClassSize.REFERENCE) // Refs to timeRange and type
-      + ClassSize.TIMERANGE;
-  public static final long DEEP_OVERHEAD_CSLM = DEEP_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP;
-  public static final long DEEP_OVERHEAD_CAM = DEEP_OVERHEAD + ClassSize.CELL_ARRAY_MAP;
-
-  /**
-   * Types of ImmutableSegment
-   */
-  public enum Type {
-    SKIPLIST_MAP_BASED,
-    ARRAY_MAP_BASED,
-  }
+  public static final long DEEP_OVERHEAD = Segment.DEEP_OVERHEAD
+      + ClassSize.align(ClassSize.REFERENCE // Referent to timeRange
+      + ClassSize.TIMERANGE);
 
   /**
    * This is an immutable segment so use the read-only TimeRange rather than the heavy-weight
@@ -59,12 +46,8 @@ public class ImmutableSegment extends Segment {
    */
   private final TimeRange timeRange;
 
-  private Type type = Type.SKIPLIST_MAP_BASED;
-
-  // whether it is based on CellFlatMap or ConcurrentSkipListMap
-  private boolean isFlat(){
-    return (type != Type.SKIPLIST_MAP_BASED);
-  }
+  // each sub-type of immutable segment knows whether it is flat or not
+  protected abstract boolean canBeFlattened();
 
   /////////////////////  CONSTRUCTORS  /////////////////////
   /**------------------------------------------------------------------------
@@ -76,59 +59,25 @@ public class ImmutableSegment extends Segment {
   }
 
   /**------------------------------------------------------------------------
-   * Copy C-tor to be used when new ImmutableSegment is being built from a Mutable one.
-   * This C-tor should be used when active MutableSegment is pushed into the compaction
-   * pipeline and becomes an ImmutableSegment.
+   * C-tor to be used to build the derived classes
    */
-  protected ImmutableSegment(Segment segment) {
-    super(segment);
-    this.type = Type.SKIPLIST_MAP_BASED;
+  protected ImmutableSegment(CellSet cs, CellComparator comparator, MemStoreLAB memStoreLAB) {
+    super(cs, comparator, memStoreLAB);
     this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange();
   }
 
   /**------------------------------------------------------------------------
-   * C-tor to be used when new CELL_ARRAY BASED ImmutableSegment is a result of compaction of a
-   * list of older ImmutableSegments.
-   * The given iterator returns the Cells that "survived" the compaction.
-   * The input parameter "type" exists for future use when more types of flat ImmutableSegments
-   * are going to be introduced.
+   * Copy C-tor to be used when new CSLMImmutableSegment (derived) is being built from a Mutable one.
+   * This C-tor should be used when active MutableSegment is pushed into the compaction
+   * pipeline and becomes an ImmutableSegment.
    */
-  protected ImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator,
-      MemStoreLAB memStoreLAB, int numOfCells, Type type, boolean merge) {
-
-    super(null, // initiailize the CellSet with NULL
-        comparator, memStoreLAB);
-    this.type = type;
-    // build the new CellSet based on CellArrayMap
-    CellSet cs = createCellArrayMapSet(numOfCells, iterator, merge);
-
-    this.setCellSet(null, cs);            // update the CellSet of the new Segment
+  protected ImmutableSegment(Segment segment) {
+    super(segment);
     this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange();
   }
 
-  /**------------------------------------------------------------------------
-   * C-tor to be used when new SKIP-LIST BASED ImmutableSegment is a result of compaction of a
-   * list of older ImmutableSegments.
-   * The given iterator returns the Cells that "survived" the compaction.
-   */
-  protected ImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator,
-      MemStoreLAB memStoreLAB) {
-    super(new CellSet(comparator), // initiailize the CellSet with empty CellSet
-        comparator, memStoreLAB);
-    type = Type.SKIPLIST_MAP_BASED;
-    while (iterator.hasNext()) {
-      Cell c = iterator.next();
-      // The scanner is doing all the elimination logic
-      // now we just copy it to the new segment
-      Cell newKV = maybeCloneWithAllocator(c);
-      boolean usedMSLAB = (newKV != c);
-      internalAdd(newKV, usedMSLAB, null);
-    }
-    this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange();
-  }
 
   /////////////////////  PUBLIC METHODS  /////////////////////
-
   @Override
   public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) {
     return this.timeRange.includesTimeRange(scan.getTimeRange()) &&
@@ -148,107 +97,4 @@ public class ImmutableSegment extends Segment {
     List<Segment> res = new ArrayList<>(Arrays.asList(this));
     return res;
   }
-
-  /**------------------------------------------------------------------------
-   * Change the CellSet of this ImmutableSegment from one based on ConcurrentSkipListMap to one
-   * based on CellArrayMap.
-   * If this ImmutableSegment is not based on ConcurrentSkipListMap , this is NOOP
-   *
-   * Synchronization of the CellSet replacement:
-   * The reference to the CellSet is AtomicReference and is updated only when ImmutableSegment
-   * is constructed (single thread) or flattened. The flattening happens as part of a single
-   * thread of compaction, but to be on the safe side the initial CellSet is locally saved
-   * before the flattening and then replaced using CAS instruction.
-   */
-  public boolean flatten(MemstoreSize memstoreSize) {
-    if (isFlat()) return false;
-    CellSet oldCellSet = getCellSet();
-    int numOfCells = getCellsCount();
-
-    // build the new (CellSet CellArrayMap based)
-    CellSet  newCellSet = recreateCellArrayMapSet(numOfCells);
-    type = Type.ARRAY_MAP_BASED;
-    setCellSet(oldCellSet,newCellSet);
-
-    // arrange the meta-data size, decrease all meta-data sizes related to SkipList
-    // (recreateCellArrayMapSet doesn't take the care for the sizes)
-    long newSegmentSizeDelta = -(numOfCells * ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
-    // add size of CellArrayMap and meta-data overhead per Cell
-    newSegmentSizeDelta = newSegmentSizeDelta + numOfCells * ClassSize.CELL_ARRAY_MAP_ENTRY;
-    incSize(0, newSegmentSizeDelta);
-    if (memstoreSize != null) {
-      memstoreSize.incMemstoreSize(0, newSegmentSizeDelta);
-    }
-
-    return true;
-  }
-
-  /////////////////////  PRIVATE METHODS  /////////////////////
-  /*------------------------------------------------------------------------*/
-  // Create CellSet based on CellArrayMap from compacting iterator
-  private CellSet createCellArrayMapSet(int numOfCells, MemStoreSegmentsIterator iterator,
-      boolean merge) {
-
-    Cell[] cells = new Cell[numOfCells];   // build the Cell Array
-    int i = 0;
-    while (iterator.hasNext()) {
-      Cell c = iterator.next();
-      // The scanner behind the iterator is doing all the elimination logic
-      if (merge) {
-        // if this is merge we just move the Cell object without copying MSLAB
-        // the sizes still need to be updated in the new segment
-        cells[i] = c;
-      } else {
-        // now we just copy it to the new segment (also MSLAB copy)
-        cells[i] = maybeCloneWithAllocator(c);
-      }
-      boolean useMSLAB = (getMemStoreLAB()!=null);
-      // second parameter true, because in compaction/merge the addition of the cell to new segment
-      // is always successful
-      updateMetaInfo(c, true, useMSLAB, null); // updates the size per cell
-      i++;
-    }
-    // build the immutable CellSet
-    CellArrayMap cam = new CellArrayMap(getComparator(), cells, 0, i, false);
-    return new CellSet(cam);
-  }
-
-  @Override
-  protected long heapSizeChange(Cell cell, boolean succ) {
-    if (succ) {
-      switch (this.type) {
-      case SKIPLIST_MAP_BASED:
-        return super.heapSizeChange(cell, succ);
-      case ARRAY_MAP_BASED:
-        return ClassSize.align(ClassSize.CELL_ARRAY_MAP_ENTRY + CellUtil.estimatedHeapSizeOf(cell));
-      }
-    }
-    return 0;
-  }
-
-  /*------------------------------------------------------------------------*/
-  // Create CellSet based on CellArrayMap from current ConcurrentSkipListMap based CellSet
-  // (without compacting iterator)
-  private CellSet recreateCellArrayMapSet(int numOfCells) {
-
-    Cell[] cells = new Cell[numOfCells];   // build the Cell Array
-    Cell curCell;
-    int idx = 0;
-    // create this segment scanner with maximal possible read point, to go over all Cells
-    KeyValueScanner segmentScanner = this.getScanner(Long.MAX_VALUE);
-
-    try {
-      while ((curCell = segmentScanner.next()) != null) {
-        cells[idx++] = curCell;
-      }
-    } catch (IOException ie) {
-      throw new IllegalStateException(ie);
-    } finally {
-      segmentScanner.close();
-    }
-
-    // build the immutable CellSet
-    CellArrayMap cam = new CellArrayMap(getComparator(), cells, 0, idx, false);
-    return new CellSet(cam);
-  }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/8ac43084/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
index 08af7fe..d4a5cf6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
@@ -80,7 +80,7 @@ public class MemStoreCompactor {
    * Note that every value covers the previous ones, i.e. if MERGE is the action it implies
    * that the youngest segment is going to be flatten anyway.
    */
-  private enum Action {
+  public enum Action {
     NOOP,
     FLATTEN,  // flatten the youngest segment in the pipeline
     MERGE,    // merge all the segments in the pipeline into one
@@ -160,7 +160,7 @@ public class MemStoreCompactor {
 
     if (action == Action.COMPACT) { // compact according to the user request
       LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName()
-          + " is going to be compacted, number of"
+          + " is going to be compacted to the " + compactingMemStore.getIndexType() + ". Number of"
           + " cells before compaction is " + versionedList.getNumOfCells());
       return Action.COMPACT;
     }
@@ -170,13 +170,15 @@ public class MemStoreCompactor {
     int numOfSegments = versionedList.getNumOfSegments();
     if (numOfSegments > pipelineThreshold) {
       LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName()
-          + " is going to be merged, as there are " + numOfSegments + " segments");
+          + " is going to be merged to the " + compactingMemStore.getIndexType()
+          + ", as there are " + numOfSegments + " segments");
       return Action.MERGE;          // to avoid too many segments, merge now
     }
 
     // if nothing of the above, then just flatten the newly joined segment
     LOG.debug("The youngest segment in the in-Memory Compaction Pipeline for store "
-        + compactingMemStore.getFamilyName() + " is going to be flattened");
+        + compactingMemStore.getFamilyName() + " is going to be flattened to the "
+        + compactingMemStore.getIndexType());
     return Action.FLATTEN;
   }
 
@@ -252,7 +254,7 @@ public class MemStoreCompactor {
 
       result = SegmentFactory.instance().createImmutableSegmentByCompaction(
           compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
-          versionedList.getNumOfCells(), ImmutableSegment.Type.ARRAY_MAP_BASED);
+          versionedList.getNumOfCells(), compactingMemStore.getIndexType());
       iterator.close();
       break;
     case MERGE:
@@ -263,8 +265,8 @@ public class MemStoreCompactor {
 
       result = SegmentFactory.instance().createImmutableSegmentByMerge(
           compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
-          versionedList.getNumOfCells(), ImmutableSegment.Type.ARRAY_MAP_BASED,
-          versionedList.getStoreSegments());
+          versionedList.getNumOfCells(), versionedList.getStoreSegments(),
+          compactingMemStore.getIndexType());
       iterator.close();
       break;
     default: throw new RuntimeException("Unknown action " + action); // sanity check

http://git-wip-us.apache.org/repos/asf/hbase/blob/8ac43084/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
index 72e937c..7f0e549 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
@@ -83,6 +83,12 @@ public interface MemStoreLAB {
    */
   void decScannerCount();
 
+  /**
+   * Return a new empty chunk without considering this chunk as current
+   * The space on this chunk will be allocated externally
+   */
+  Chunk getNewExternalChunk();
+
   public static MemStoreLAB newInstance(Configuration conf) {
     MemStoreLAB memStoreLAB = null;
     if (isEnabled(conf)) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8ac43084/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
index 4fba82d..21b3cc9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
@@ -253,6 +253,17 @@ public class MemStoreLABImpl implements MemStoreLAB {
     return null;
   }
 
+  // Returning a new chunk, without replacing current chunk,
+  // meaning MSLABImpl does not make the returned chunk as CurChunk.
+  // The space on this chunk will be allocated externally
+  // The interface is only for external callers
+  @Override
+  public Chunk getNewExternalChunk() {
+    Chunk c = this.chunkCreator.getChunk();
+    chunks.add(c.getId());
+    return c;
+  }
+
   @VisibleForTesting
   Chunk getCurrentChunk() {
     return this.curChunk.get();

http://git-wip-us.apache.org/repos/asf/hbase/blob/8ac43084/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
index 7361750..fb8d472 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
@@ -42,6 +42,7 @@ public class MutableSegment extends Segment {
 
   protected MutableSegment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) {
     super(cellSet, comparator, memStoreLAB);
+    incSize(0,DEEP_OVERHEAD); // update the mutable segment metadata
   }
 
   /**
@@ -121,4 +122,8 @@ public class MutableSegment extends Segment {
   public long getMinTimestamp() {
     return this.timeRangeTracker.getMin();
   }
+
+  @Override protected long indexEntrySize() {
+      return ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8ac43084/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
index 8f43fa8..e30d031 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
@@ -305,6 +305,10 @@ public abstract class Segment {
     }
   }
 
+  protected void updateMetaInfo(Cell cellToAdd, boolean succ, MemstoreSize memstoreSize) {
+    updateMetaInfo(cellToAdd, succ, (getMemStoreLAB()!=null), memstoreSize);
+  }
+
   /**
    * @return The increase in heap size because of this cell addition. This includes this cell POJO's
    *         heap size itself and additional overhead because of addition on to CSLM.
@@ -312,11 +316,13 @@ public abstract class Segment {
   protected long heapSizeChange(Cell cell, boolean succ) {
     if (succ) {
       return ClassSize
-          .align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + CellUtil.estimatedHeapSizeOf(cell));
+          .align(indexEntrySize() + CellUtil.estimatedHeapSizeOf(cell));
     }
     return 0;
   }
 
+  protected abstract long indexEntrySize();
+
   /**
    * Returns a subset of the segment cell set, which starts with the given cell
    * @param firstCell a cell in the segment

http://git-wip-us.apache.org/repos/asf/hbase/blob/8ac43084/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
index 1a8b89d..d719a55 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
@@ -18,7 +18,6 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -41,42 +40,36 @@ public final class SegmentFactory {
     return instance;
   }
 
-  // create skip-list-based (non-flat) immutable segment from compacting old immutable segments
-  public ImmutableSegment createImmutableSegment(final Configuration conf,
-      final CellComparator comparator, MemStoreSegmentsIterator iterator) {
-    return new ImmutableSegment(comparator, iterator, MemStoreLAB.newInstance(conf));
-  }
-
   // create composite immutable segment from a list of segments
+  // for snapshot consisting of multiple segments
   public CompositeImmutableSegment createCompositeImmutableSegment(
       final CellComparator comparator, List<ImmutableSegment> segments) {
     return new CompositeImmutableSegment(comparator, segments);
-
   }
 
   // create new flat immutable segment from compacting old immutable segments
+  // for compaction
   public ImmutableSegment createImmutableSegmentByCompaction(final Configuration conf,
       final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells,
-      ImmutableSegment.Type segmentType)
+      CompactingMemStore.IndexType idxType)
       throws IOException {
-    Preconditions.checkArgument(segmentType == ImmutableSegment.Type.ARRAY_MAP_BASED,
-        "wrong immutable segment type");
+
     MemStoreLAB memStoreLAB = MemStoreLAB.newInstance(conf);
     return
-        // the last parameter "false" means not to merge, but to compact the pipeline
-        // in order to create the new segment
-        new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, segmentType, false);
+        createImmutableSegment(
+            conf,comparator,iterator,memStoreLAB,numOfCells,MemStoreCompactor.Action.COMPACT,idxType);
   }
 
   // create empty immutable segment
+  // for initializations
   public ImmutableSegment createImmutableSegment(CellComparator comparator) {
     MutableSegment segment = generateMutableSegment(null, comparator, null);
     return createImmutableSegment(segment);
   }
 
-  // create immutable segment from mutable segment
+  // create not-flat immutable segment from mutable segment
   public ImmutableSegment createImmutableSegment(MutableSegment segment) {
-    return new ImmutableSegment(segment);
+    return new CSLMImmutableSegment(segment);
   }
 
   // create mutable segment
@@ -86,19 +79,58 @@ public final class SegmentFactory {
   }
 
   // create new flat immutable segment from merging old immutable segments
+  // for merge
   public ImmutableSegment createImmutableSegmentByMerge(final Configuration conf,
       final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells,
-      ImmutableSegment.Type segmentType, List<ImmutableSegment> segments)
+      List<ImmutableSegment> segments, CompactingMemStore.IndexType idxType)
       throws IOException {
-    Preconditions.checkArgument(segmentType == ImmutableSegment.Type.ARRAY_MAP_BASED,
-        "wrong immutable segment type");
+
     MemStoreLAB memStoreLAB = getMergedMemStoreLAB(conf, segments);
     return
-        // the last parameter "true" means to merge the compaction pipeline
-        // in order to create the new segment
-        new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, segmentType, true);
+        createImmutableSegment(
+            conf,comparator,iterator,memStoreLAB,numOfCells,MemStoreCompactor.Action.MERGE,idxType);
+
+  }
+
+  // create flat immutable segment from non-flat immutable segment
+  // for flattening
+  public ImmutableSegment createImmutableSegmentByFlattening(
+      CSLMImmutableSegment segment, CompactingMemStore.IndexType idxType, MemstoreSize memstoreSize) {
+    ImmutableSegment res = null;
+    switch (idxType) {
+    case CHUNK_MAP:
+      res = new CellChunkImmutableSegment(segment, memstoreSize);
+      break;
+    case CSLM_MAP:
+      assert false; // non-flat segment can not be the result of flattening
+      break;
+    case ARRAY_MAP:
+      res = new CellArrayImmutableSegment(segment, memstoreSize);
+      break;
+    }
+    return res;
   }
+
+
   //****** private methods to instantiate concrete store segments **********//
+  private ImmutableSegment createImmutableSegment(final Configuration conf, final CellComparator comparator,
+      MemStoreSegmentsIterator iterator, MemStoreLAB memStoreLAB, int numOfCells,
+      MemStoreCompactor.Action action, CompactingMemStore.IndexType idxType) {
+
+    ImmutableSegment res = null;
+    switch (idxType) {
+    case CHUNK_MAP:
+      res = new CellChunkImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, action);
+      break;
+    case CSLM_MAP:
+      assert false; // non-flat segment can not be created here
+      break;
+    case ARRAY_MAP:
+      res = new CellArrayImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, action);
+      break;
+    }
+    return res;
+  }
 
   private MutableSegment generateMutableSegment(final Configuration conf, CellComparator comparator,
       MemStoreLAB memStoreLAB) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8ac43084/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
index bf74a9e..1af0d88 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
@@ -369,6 +369,7 @@ public class TestHeapSize  {
     if (expected != actual) {
       ClassSize.estimateBase(cl, true);
       ClassSize.estimateBase(AtomicLong.class, true);
+      ClassSize.estimateBase(AtomicLong.class, true);
       ClassSize.estimateBase(AtomicReference.class, true);
       ClassSize.estimateBase(CellSet.class, true);
       ClassSize.estimateBase(TimeRangeTracker.class, true);
@@ -376,9 +377,28 @@ public class TestHeapSize  {
       assertEquals(expected, actual);
     }
 
-    // ImmutableSegment Deep overhead
+    // ImmutableSegments Deep overhead
     cl = ImmutableSegment.class;
-    actual = ImmutableSegment.DEEP_OVERHEAD_CSLM;
+    actual = ImmutableSegment.DEEP_OVERHEAD;
+    expected = ClassSize.estimateBase(cl, false);
+    expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
+    expected += ClassSize.estimateBase(AtomicReference.class, false);
+    expected += ClassSize.estimateBase(CellSet.class, false);
+    expected += ClassSize.estimateBase(TimeRangeTracker.class, false);
+    expected += ClassSize.estimateBase(TimeRange.class, false);
+    if (expected != actual) {
+      ClassSize.estimateBase(cl, true);
+      ClassSize.estimateBase(AtomicLong.class, true);
+      ClassSize.estimateBase(AtomicLong.class, true);
+      ClassSize.estimateBase(AtomicReference.class, true);
+      ClassSize.estimateBase(CellSet.class, true);
+      ClassSize.estimateBase(TimeRangeTracker.class, true);
+      ClassSize.estimateBase(TimeRange.class, true);
+      assertEquals(expected, actual);
+    }
+
+    cl = CSLMImmutableSegment.class;
+    actual = CSLMImmutableSegment.DEEP_OVERHEAD_CSLM;
     expected = ClassSize.estimateBase(cl, false);
     expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
     expected += ClassSize.estimateBase(AtomicReference.class, false);
@@ -389,6 +409,7 @@ public class TestHeapSize  {
     if (expected != actual) {
       ClassSize.estimateBase(cl, true);
       ClassSize.estimateBase(AtomicLong.class, true);
+      ClassSize.estimateBase(AtomicLong.class, true);
       ClassSize.estimateBase(AtomicReference.class, true);
       ClassSize.estimateBase(CellSet.class, true);
       ClassSize.estimateBase(TimeRangeTracker.class, true);
@@ -396,7 +417,8 @@ public class TestHeapSize  {
       ClassSize.estimateBase(ConcurrentSkipListMap.class, true);
       assertEquals(expected, actual);
     }
-    actual = ImmutableSegment.DEEP_OVERHEAD_CAM;
+    cl = CellArrayImmutableSegment.class;
+    actual = CellArrayImmutableSegment.DEEP_OVERHEAD_CAM;
     expected = ClassSize.estimateBase(cl, false);
     expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
     expected += ClassSize.estimateBase(AtomicReference.class, false);
@@ -407,6 +429,7 @@ public class TestHeapSize  {
     if (expected != actual) {
       ClassSize.estimateBase(cl, true);
       ClassSize.estimateBase(AtomicLong.class, true);
+      ClassSize.estimateBase(AtomicLong.class, true);
       ClassSize.estimateBase(AtomicReference.class, true);
       ClassSize.estimateBase(CellSet.class, true);
       ClassSize.estimateBase(TimeRangeTracker.class, true);

http://git-wip-us.apache.org/repos/asf/hbase/blob/8ac43084/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
index 5872d69..6307d32 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -286,8 +287,8 @@ public class TestCellFlatSet extends TestCase {
 
     ByteBuffer idxBuffer = idxChunk.getData();  // the buffers of the chunks
     ByteBuffer dataBuffer = dataChunk.getData();
-    int dataOffset = Bytes.SIZEOF_INT;          // offset inside data buffer
-    int idxOffset = Bytes.SIZEOF_INT;           // skip the space for chunk ID
+    int dataOffset = ChunkCreator.SIZEOF_CHUNK_HEADER;          // offset inside data buffer
+    int idxOffset = ChunkCreator.SIZEOF_CHUNK_HEADER;           // skip the space for chunk ID
 
     Cell[] cellArray = asc ? ascCells : descCells;
 
@@ -296,16 +297,16 @@ public class TestCellFlatSet extends TestCase {
       if (dataOffset + KeyValueUtil.length(kv) > chunkCreator.getChunkSize()) {
         dataChunk = chunkCreator.getChunk();    // allocate more data chunks if needed
         dataBuffer = dataChunk.getData();
-        dataOffset = Bytes.SIZEOF_INT;
+        dataOffset = ChunkCreator.SIZEOF_CHUNK_HEADER;
       }
       int dataStartOfset = dataOffset;
       dataOffset = KeyValueUtil.appendTo(kv, dataBuffer, dataOffset, false); // write deep cell data
 
       // do we have enough space to write the cell-representation on the index chunk?
-      if (idxOffset + CellChunkMap.SIZEOF_CELL_REP > chunkCreator.getChunkSize()) {
+      if (idxOffset + ClassSize.CELL_CHUNK_MAP_ENTRY > chunkCreator.getChunkSize()) {
         idxChunk = chunkCreator.getChunk();    // allocate more index chunks if needed
         idxBuffer = idxChunk.getData();
-        idxOffset = Bytes.SIZEOF_INT;
+        idxOffset = ChunkCreator.SIZEOF_CHUNK_HEADER;
         chunkArray[chunkArrayIdx++] = idxChunk;
       }
       idxOffset = ByteBufferUtils.putInt(idxBuffer, idxOffset, dataChunk.getId()); // write data chunk id
@@ -314,8 +315,6 @@ public class TestCellFlatSet extends TestCase {
       idxOffset = ByteBufferUtils.putLong(idxBuffer, idxOffset, kv.getSequenceId());     // seqId
     }
 
-    return asc ?
-        new CellChunkMap(CellComparator.COMPARATOR,chunkArray,0,NUM_OF_CELLS,false) :
-        new CellChunkMap(CellComparator.COMPARATOR,chunkArray,0,NUM_OF_CELLS,true);
+    return new CellChunkMap(CellComparator.COMPARATOR,chunkArray,0,NUM_OF_CELLS,!asc);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8ac43084/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
index 04435db..b0eadb5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.EnvironmentEdge;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Threads;
@@ -98,7 +99,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
         .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
     chunkCreator = ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false,
-      globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
+        globalMemStoreLimit, 0.4f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
     assertTrue(chunkCreator != null);
   }
 
@@ -563,16 +564,71 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     assertTrue(chunkCreator.getPoolSize() > 0);
   }
 
+  @Test
+  public void testFlatteningToCellChunkMap() throws IOException {
+
+    // set memstore to flat into CellChunkMap
+    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
+    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
+        String.valueOf(compactionType));
+    ((CompactingMemStore)memstore).initiateType(compactionType);
+    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
+        String.valueOf(CompactingMemStore.IndexType.CHUNK_MAP));
+    ((CompactingMemStore)memstore).setIndexType();
+    int numOfCells = 8;
+    String[] keys1 = { "A", "A", "B", "C", "D", "D", "E", "F" }; //A1, A2, B3, C4, D5, D6, E7, F8
+
+    // make one cell
+    byte[] row = Bytes.toBytes(keys1[0]);
+    byte[] val = Bytes.toBytes(keys1[0] + 0);
+    KeyValue kv =
+        new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
+            System.currentTimeMillis(), val);
+
+    // test 1 bucket
+    int totalCellsLen = addRowsByKeys(memstore, keys1);
+    long oneCellOnCSLMHeapSize =
+        ClassSize.align(
+            ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD + KeyValueUtil
+                .length(kv));
+
+    long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD;
+    assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
+
+    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and flatten
+    assertEquals(0, memstore.getSnapshot().getCellsCount());
+    // One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode.
+    // totalCellsLen should remain the same
+    long oneCellOnCCMHeapSize =
+        ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv));
+    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
+        + numOfCells * oneCellOnCCMHeapSize;
+
+    assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
+
+    MemstoreSize size = memstore.getFlushableSize();
+    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
+    region.decrMemstoreSize(size);  // simulate flusher
+    ImmutableSegment s = memstore.getSnapshot();
+    assertEquals(numOfCells, s.getCellsCount());
+    assertEquals(0, regionServicesForStores.getMemstoreSize());
+
+    memstore.clearSnapshot(snapshot.getId());
+  }
+
   //////////////////////////////////////////////////////////////////////////////
   // Compaction tests
   //////////////////////////////////////////////////////////////////////////////
   @Test
   public void testCompaction1Bucket() throws IOException {
 
-    // set memstore to do data compaction and not to use the speculative scan
-    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER;
-    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
-        String.valueOf(compactionType));
+    // set memstore to do basic structure flattening, the "eager" option is tested in
+    // TestCompactingToCellFlatMapMemStore
+    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
+    memstore.getConfiguration()
+        .set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(compactionType));
     ((CompactingMemStore)memstore).initiateType(compactionType);
 
     String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4
@@ -581,25 +637,24 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     int totalCellsLen = addRowsByKeys(memstore, keys1);
     int oneCellOnCSLMHeapSize = 120;
     int oneCellOnCAHeapSize = 88;
-    long totalHeapSize = 4 * oneCellOnCSLMHeapSize;
+    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
     assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
     assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
 
-    MemstoreSize size = memstore.getFlushableSize();
     ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
-    // totalCellsLen
-    totalCellsLen = (totalCellsLen * 3) / 4;
-    totalHeapSize = 3 * oneCellOnCAHeapSize;
+    // There is no compaction, as the compacting memstore type is basic.
+    // totalCellsLen remains the same
+    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
+        + 4 * oneCellOnCAHeapSize;
     assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
     assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
 
-    size = memstore.getFlushableSize();
+    MemstoreSize size = memstore.getFlushableSize();
     MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
     region.decrMemstoreSize(size);  // simulate flusher
     ImmutableSegment s = memstore.getSnapshot();
-    assertEquals(3, s.getCellsCount());
+    assertEquals(4, s.getCellsCount());
     assertEquals(0, regionServicesForStores.getMemstoreSize());
 
     memstore.clearSnapshot(snapshot.getId());
@@ -608,8 +663,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
   @Test
   public void testCompaction2Buckets() throws IOException {
 
-    // set memstore to do data compaction and not to use the speculative scan
-    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER;
+    // set memstore to do basic structure flattening, the "eager" option is tested in
+    // TestCompactingToCellFlatMapMemStore
+    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
     memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
         String.valueOf(compactionType));
     ((CompactingMemStore)memstore).initiateType(compactionType);
@@ -619,44 +675,43 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     int totalCellsLen1 = addRowsByKeys(memstore, keys1);
     int oneCellOnCSLMHeapSize = 120;
     int oneCellOnCAHeapSize = 88;
-    long totalHeapSize = 4 * oneCellOnCSLMHeapSize;
+    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
 
     assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
     assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
 
-    MemstoreSize size = memstore.getFlushableSize();
     ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
     int counter = 0;
     for ( Segment s : memstore.getSegments()) {
       counter += s.getCellsCount();
     }
-    assertEquals(3, counter);
+    assertEquals(4, counter);
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    // One cell is duplicated and the compaction will remove it. All cells of same time so adjusting
-    // totalCellsLen
-    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
+    // There is no compaction, as the compacting memstore type is basic.
+    // totalCellsLen remains the same
     assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
-    totalHeapSize = 3 * oneCellOnCAHeapSize;
+    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
+        + 4 * oneCellOnCAHeapSize;
     assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
 
     int totalCellsLen2 = addRowsByKeys(memstore, keys2);
     totalHeapSize += 3 * oneCellOnCSLMHeapSize;
     assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
-    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
+    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
 
-    size = memstore.getFlushableSize();
+    MemstoreSize size = memstore.getFlushableSize();
     ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    totalCellsLen2 = totalCellsLen2 / 3;// 2 cells duplicated in set 2
     assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
-    totalHeapSize = 4 * oneCellOnCAHeapSize;
+    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
+        + 7 * oneCellOnCAHeapSize;
     assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
 
     size = memstore.getFlushableSize();
     MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
     region.decrMemstoreSize(size);  // simulate flusher
     ImmutableSegment s = memstore.getSnapshot();
-    assertEquals(4, s.getCellsCount());
+    assertEquals(7, s.getCellsCount());
     assertEquals(0, regionServicesForStores.getMemstoreSize());
 
     memstore.clearSnapshot(snapshot.getId());
@@ -678,10 +733,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     int oneCellOnCSLMHeapSize = 120;
     int oneCellOnCAHeapSize = 88;
     assertEquals(totalCellsLen1, region.getMemstoreSize());
-    long totalHeapSize = 4 * oneCellOnCSLMHeapSize;
+    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
     assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
-
-    MemstoreSize size = memstore.getFlushableSize();
     ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
 
     assertEquals(0, memstore.getSnapshot().getCellsCount());
@@ -690,29 +743,31 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     totalCellsLen1 = (totalCellsLen1 * 3) / 4;
     assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
     // In memory flush to make a CellArrayMap instead of CSLM. See the overhead diff.
-    totalHeapSize = 3 * oneCellOnCAHeapSize;
+    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
+        + 3 * oneCellOnCAHeapSize;
     assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
 
     int totalCellsLen2 = addRowsByKeys(memstore, keys2);// Adding 3 more cells.
-    long totalHeapSize2 = 3 * oneCellOnCSLMHeapSize;
+    long totalHeapSize2 = totalHeapSize + 3 * oneCellOnCSLMHeapSize;
 
     assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
-    assertEquals(totalHeapSize + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
+    assertEquals(totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
 
     ((CompactingMemStore) memstore).disableCompaction();
-    size = memstore.getFlushableSize();
+    MemstoreSize size = memstore.getFlushableSize();
     ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction
     assertEquals(0, memstore.getSnapshot().getCellsCount());
     // No change in the cells data size. ie. memstore size. as there is no compaction.
     assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
-    assertEquals(totalHeapSize + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
+    assertEquals(totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM,
+        ((CompactingMemStore) memstore).heapSize());
 
     int totalCellsLen3 = addRowsByKeys(memstore, keys3);// 3 more cells added
     assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
         regionServicesForStores.getMemstoreSize());
-    long totalHeapSize3 = 3 * oneCellOnCSLMHeapSize;
-    assertEquals(totalHeapSize + totalHeapSize2 + totalHeapSize3,
-        ((CompactingMemStore) memstore).heapSize());
+    long totalHeapSize3 = totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
+        + 3 * oneCellOnCSLMHeapSize;
+    assertEquals(totalHeapSize3, ((CompactingMemStore) memstore).heapSize());
 
     ((CompactingMemStore)memstore).enableCompaction();
     size = memstore.getFlushableSize();
@@ -725,7 +780,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
         regionServicesForStores.getMemstoreSize());
     // Only 4 unique cells left
-    assertEquals(4 * oneCellOnCAHeapSize, ((CompactingMemStore) memstore).heapSize());
+    assertEquals(4 * oneCellOnCAHeapSize + MutableSegment.DEEP_OVERHEAD
+        + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, ((CompactingMemStore) memstore).heapSize());
 
     size = memstore.getFlushableSize();
     MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot


Mime
View raw message