hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From anasta...@apache.org
Subject [2/2] hbase git commit: HBASE-18010: CellChunkMap integration into CompactingMemStore, merge to Release 2.0
Date Sun, 01 Oct 2017 08:01:48 GMT
HBASE-18010: CellChunkMap integration into CompactingMemStore, merge to Release 2.0


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/95405fbd
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/95405fbd
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/95405fbd

Branch: refs/heads/branch-2
Commit: 95405fbd83e45e22f8608ff7966cb05fd1103729
Parents: 7ee44a8
Author: anastas <anastas@yahoo-inc.com>
Authored: Sun Oct 1 11:01:10 2017 +0300
Committer: anastas <anastas@yahoo-inc.com>
Committed: Sun Oct 1 11:01:10 2017 +0300

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/util/ClassSize.java |  29 +-
 .../hadoop/hbase/regionserver/CellChunkMap.java |  33 +-
 .../hadoop/hbase/regionserver/ChunkCreator.java |  27 +-
 .../hbase/regionserver/CompactingMemStore.java  |  14 +-
 .../hbase/regionserver/CompactionPipeline.java  |  28 +-
 .../regionserver/CompositeImmutableSegment.java |  10 +
 .../regionserver/ImmutableMemStoreLAB.java      |   9 +
 .../hbase/regionserver/ImmutableSegment.java    | 182 +------
 .../hbase/regionserver/MemStoreCompactor.java   |  35 +-
 .../hadoop/hbase/regionserver/MemStoreLAB.java  |   6 +
 .../hbase/regionserver/MemStoreLABImpl.java     |  11 +
 .../hbase/regionserver/MutableSegment.java      |   5 +
 .../hadoop/hbase/regionserver/Segment.java      |   8 +-
 .../hbase/regionserver/SegmentFactory.java      |  76 ++-
 .../apache/hadoop/hbase/io/TestHeapSize.java    |  29 +-
 .../hbase/regionserver/TestCellFlatSet.java     |  15 +-
 .../regionserver/TestCompactingMemStore.java    | 134 +++--
 .../TestCompactingToCellArrayMapMemStore.java   | 492 -------------------
 .../hbase/regionserver/TestDefaultMemStore.java |   7 +-
 .../regionserver/TestHRegionReplayEvents.java   |   7 +-
 .../hadoop/hbase/regionserver/TestHStore.java   |  10 +-
 .../regionserver/TestPerColumnFamilyFlush.java  |  21 +-
 .../TestWalAndCompactingMemStoreFlush.java      |  28 +-
 23 files changed, 398 insertions(+), 818 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/95405fbd/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
index 5c793eb..d63b1cf 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
@@ -89,6 +89,15 @@ public class ClassSize {
   /** Overhead for ConcurrentSkipListMap Entry */
   public static final int CONCURRENT_SKIPLISTMAP_ENTRY;
 
+  /** Overhead for CellFlatMap */
+  public static final int CELL_FLAT_MAP;
+
+  /** Overhead for CellChunkMap */
+  public static final int CELL_CHUNK_MAP;
+
+  /** Overhead for Cell Chunk Map Entry */
+  public static final int CELL_CHUNK_MAP_ENTRY;
+
   /** Overhead for CellArrayMap */
   public static final int CELL_ARRAY_MAP;
 
@@ -275,13 +284,17 @@ public class ClassSize {
     // The size changes from jdk7 to jdk8, estimate the size rather than use a conditional
     CONCURRENT_SKIPLISTMAP = (int) estimateBase(ConcurrentSkipListMap.class, false);
 
-    // CELL_ARRAY_MAP is the size of an instance of CellArrayMap class, which extends
-    // CellFlatMap class. CellArrayMap object containing a ref to an Array, so
-    // OBJECT + REFERENCE + ARRAY
     // CellFlatMap object contains two integers, one boolean and one reference to object, so
     // 2*INT + BOOLEAN + REFERENCE
-    CELL_ARRAY_MAP = align(OBJECT + 2*Bytes.SIZEOF_INT + Bytes.SIZEOF_BOOLEAN
-        + ARRAY + 2*REFERENCE);
+    CELL_FLAT_MAP = OBJECT + 2*Bytes.SIZEOF_INT + Bytes.SIZEOF_BOOLEAN + REFERENCE;
+
+    // CELL_ARRAY_MAP is the size of an instance of CellArrayMap class, which extends
+    // CellFlatMap class. CellArrayMap object containing a ref to an Array of Cells
+    CELL_ARRAY_MAP = align(CELL_FLAT_MAP + REFERENCE + ARRAY);
+
+    // CELL_CHUNK_MAP is the size of an instance of CellChunkMap class, which extends
+    // CellFlatMap class. CellChunkMap object containing a ref to an Array of Chunks
+    CELL_CHUNK_MAP = align(CELL_FLAT_MAP + REFERENCE + ARRAY);
 
     CONCURRENT_SKIPLISTMAP_ENTRY = align(
         align(OBJECT + (3 * REFERENCE)) + /* one node per entry */
@@ -290,6 +303,12 @@ public class ClassSize {
     // REFERENCE in the CellArrayMap all the rest is counted in KeyValue.heapSize()
     CELL_ARRAY_MAP_ENTRY = align(REFERENCE);
 
+    // The Cell Representation in the CellChunkMap, the Cell object size shouldn't be counted
+    // in KeyValue.heapSize()
+    // each cell-representation requires three integers for chunkID (reference to the ByteBuffer),
+    // offset and length, and one long for seqID
+    CELL_CHUNK_MAP_ENTRY = 3*Bytes.SIZEOF_INT + Bytes.SIZEOF_LONG;
+
     REENTRANT_LOCK = align(OBJECT + (3 * REFERENCE));
 
     ATOMIC_LONG = align(OBJECT + Bytes.SIZEOF_LONG);

http://git-wip-us.apache.org/repos/asf/hbase/blob/95405fbd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkMap.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkMap.java
index 78315a8..388b675 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkMap.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkMap.java
@@ -21,11 +21,12 @@ package org.apache.hadoop.hbase.regionserver;
 
 import java.nio.ByteBuffer;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.ClassSize;
 
 import java.util.Comparator;
 
@@ -56,11 +57,12 @@ import java.util.Comparator;
 public class CellChunkMap extends CellFlatMap {
 
   private final Chunk[] chunks;             // the array of chunks, on which the index is based
-  private final int numOfCellsInsideChunk;  // constant number of cell-representations in a chunk
 
-  // each cell-representation requires three integers for chunkID (reference to the ByteBuffer),
-  // offset and length, and one long for seqID
-  public static final int SIZEOF_CELL_REP = 3*Bytes.SIZEOF_INT + Bytes.SIZEOF_LONG ;
+  // constant number of cell-representations in a chunk
+  // each chunk starts with its own ID following the cells data
+  public static final int NUM_OF_CELL_REPS_IN_CHUNK =
+      (ChunkCreator.getInstance().getChunkSize() - ChunkCreator.SIZEOF_CHUNK_HEADER) /
+          ClassSize.CELL_CHUNK_MAP_ENTRY;
 
   /**
    * C-tor for creating CellChunkMap from existing Chunk array, which must be ordered
@@ -75,9 +77,6 @@ public class CellChunkMap extends CellFlatMap {
       Chunk[] chunks, int min, int max, boolean descending) {
     super(comparator, min, max, descending);
     this.chunks = chunks;
-    this.numOfCellsInsideChunk = // each chunk starts with its own ID following the cells data
-        (ChunkCreator.getInstance().getChunkSize() - Bytes.SIZEOF_INT) / SIZEOF_CELL_REP;
-
   }
 
   /* To be used by base (CellFlatMap) class only to create a sub-CellFlatMap
@@ -91,20 +90,20 @@ public class CellChunkMap extends CellFlatMap {
   @Override
   protected Cell getCell(int i) {
     // get the index of the relevant chunk inside chunk array
-    int chunkIndex = (i / numOfCellsInsideChunk);
+    int chunkIndex = (i / NUM_OF_CELL_REPS_IN_CHUNK);
     ByteBuffer block = chunks[chunkIndex].getData();// get the ByteBuffer of the relevant chunk
-    int j = i - chunkIndex * numOfCellsInsideChunk; // get the index of the cell-representation
+    int j = i - chunkIndex * NUM_OF_CELL_REPS_IN_CHUNK; // get the index of the cell-representation
 
     // find inside the offset inside the chunk holding the index, skip bytes for chunk id
-    int offsetInBytes = Bytes.SIZEOF_INT + j* SIZEOF_CELL_REP;
-
+    int offsetInBytes = ChunkCreator.SIZEOF_CHUNK_HEADER + j* ClassSize.CELL_CHUNK_MAP_ENTRY;
 
     // find the chunk holding the data of the cell, the chunkID is stored first
     int chunkId = ByteBufferUtils.toInt(block, offsetInBytes);
     Chunk chunk = ChunkCreator.getInstance().getChunk(chunkId);
     if (chunk == null) {
-      // this should not happen, putting an assertion here at least for the testing period
-      assert false;
+      // this should not happen
+      throw new IllegalArgumentException("In CellChunkMap, cell must be associated with chunk."
+          + ". We were looking for a cell at index " + i);
     }
 
     // find the offset of the data of the cell, skip integer for chunkID, offset is stored second
@@ -118,8 +117,10 @@ public class CellChunkMap extends CellFlatMap {
 
     ByteBuffer buf = chunk.getData();   // get the ByteBuffer where the cell data is stored
     if (buf == null) {
-      // this should not happen, putting an assertion here at least for the testing period
-      assert false;
+      // this should not happen
+      throw new IllegalArgumentException("In CellChunkMap, chunk must be associated with ByteBuffer."
+          + " Chunk: " + chunk + " Chunk ID: " + chunk.getId() + ", is from pool: "
+          + chunk.isFromPool() + ". We were looking for a cell at index " + i);
     }
 
     return new ByteBufferChunkCell(buf, offsetOfCell, lengthOfCell, cellSeqID);

http://git-wip-us.apache.org/repos/asf/hbase/blob/95405fbd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
index 6314f39..551fdb1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
@@ -1,4 +1,3 @@
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -18,7 +17,7 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import java.lang.ref.SoftReference;
+import java.lang.ref.WeakReference;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
@@ -36,10 +35,11 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.util.StringUtils;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * Does the management of memstoreLAB chunk creations. A monotonically incrementing id is associated
@@ -51,7 +51,14 @@ public class ChunkCreator {
   // monotonically increasing chunkid
   private AtomicInteger chunkID = new AtomicInteger(1);
   // maps the chunk against the monotonically increasing chunk id. We need to preserve the
-  // natural ordering of the key. It also helps to protect from GC.
+  // natural ordering of the key
+  // CellChunkMap creation should convert the weak ref to hard reference
+
+  // chunk id of each chunk is the first integer written on each chunk,
+  // the header size need to be changed in case chunk id size is changed
+  public static final int SIZEOF_CHUNK_HEADER = Bytes.SIZEOF_INT;
+
+  // mapping from chunk IDs to chunks
   private Map<Integer, Chunk> chunkIdMap = new ConcurrentHashMap<Integer, Chunk>();
 
   private final int chunkSize;
@@ -137,10 +144,6 @@ public class ChunkCreator {
     return chunk;
   }
 
-  private Chunk createChunkForPool() {
-    return createChunk(true, CompactingMemStore.IndexType.ARRAY_MAP);
-  }
-
   /**
    * Creates the chunk either onheap or offheap
    * @param pool indicates if the chunks have to be created which will be used by the Pool
@@ -226,7 +229,8 @@ public class ChunkCreator {
       this.reclaimedChunks = new LinkedBlockingQueue<>();
       for (int i = 0; i < initialCount; i++) {
         // Chunks from pool are covered with strong references anyway
-        Chunk chunk = createChunkForPool();
+        // TODO: change to CHUNK_MAP if it is generally defined
+        Chunk chunk = createChunk(true, CompactingMemStore.IndexType.ARRAY_MAP);
         chunk.init();
         reclaimedChunks.add(chunk);
       }
@@ -258,7 +262,8 @@ public class ChunkCreator {
           long created = this.chunkCount.get();
           if (created < this.maxCount) {
             if (this.chunkCount.compareAndSet(created, created + 1)) {
-              chunk = createChunkForPool();
+              // TODO: change to CHUNK_MAP if it is generally defined
+              chunk = createChunk(true, CompactingMemStore.IndexType.ARRAY_MAP);
               break;
             }
           } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/95405fbd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index 4f3f15b..da502c2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -314,7 +314,19 @@ public class CompactingMemStore extends AbstractMemStore {
    *           The flattening happens only if versions match.
    */
   public void flattenOneSegment(long requesterVersion) {
-    pipeline.flattenYoungestSegment(requesterVersion);
+    pipeline.flattenOneSegment(requesterVersion, indexType);
+  }
+
+  // setter is used only for testability
+  @VisibleForTesting
+  public void setIndexType() {
+    indexType = IndexType.valueOf(getConfiguration().get(
+        CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
+        CompactingMemStore.COMPACTING_MEMSTORE_INDEX_DEFAULT));
+  }
+
+  public IndexType getIndexType() {
+    return indexType;
   }
 
   public boolean hasImmutableSegments() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/95405fbd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index 3983375..7fb8df4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.util.ClassSize;
  * method accesses the read-only copy more than once it makes a local copy of it
  * to ensure it accesses the same copy.
  *
- * The methods getVersionedList(), getVersionedTail(), and flattenYoungestSegment() are also
+ * The methods getVersionedList(), getVersionedTail(), and flattenOneSegment() are also
  * protected by a lock since they need to have a consistent (atomic) view of the pipeline list
  * and version number.
  */
@@ -183,7 +183,7 @@ public class CompactionPipeline {
    *
    * @return true iff a segment was successfully flattened
    */
-  public boolean flattenYoungestSegment(long requesterVersion) {
+  public boolean flattenOneSegment(long requesterVersion, CompactingMemStore.IndexType idxType) {
 
     if(requesterVersion != version) {
       LOG.warn("Segment flattening failed, because versions do not match. Requester version: "
@@ -196,17 +196,22 @@ public class CompactionPipeline {
         LOG.warn("Segment flattening failed, because versions do not match");
         return false;
       }
-
+      int i = 0;
       for (ImmutableSegment s : pipeline) {
-        // remember the old size in case this segment is going to be flatten
-        MemstoreSize memstoreSize = new MemstoreSize();
-        if (s.flatten(memstoreSize)) {
+        if ( s.canBeFlattened() ) {
+          MemstoreSize newMemstoreSize = new MemstoreSize(); // the size to be updated
+          ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening(
+              (CSLMImmutableSegment)s,idxType,newMemstoreSize);
+          replaceAtIndex(i,newS);
           if(region != null) {
-            region.addMemstoreSize(memstoreSize);
+            // update the global memstore size counter
+            // upon flattening there is no change in the data size
+            region.addMemstoreSize(new MemstoreSize(0, newMemstoreSize.getHeapSize()));
           }
           LOG.debug("Compaction pipeline segment " + s + " was flattened");
           return true;
         }
+        i++;
       }
 
     }
@@ -271,12 +276,19 @@ public class CompactionPipeline {
     }
   }
 
+  // replacing one segment in the pipeline with a new one exactly at the same index
+  // need to be called only within synchronized block
+  private void replaceAtIndex(int idx, ImmutableSegment newSegment) {
+    pipeline.set(idx, newSegment);
+    readOnlyCopy = new LinkedList<>(pipeline);
+  }
+
   public Segment getTail() {
     List<? extends Segment> localCopy = getSegments();
     if(localCopy.isEmpty()) {
       return null;
     }
-    return localCopy.get(localCopy.size()-1);
+    return localCopy.get(localCopy.size() - 1);
   }
 
   private boolean addFirst(ImmutableSegment segment) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/95405fbd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
index bd5afac..1bae267 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompositeImmutableSegment.java
@@ -184,6 +184,16 @@ public class CompositeImmutableSegment extends ImmutableSegment {
     throw new IllegalStateException("Not supported by CompositeImmutableScanner");
   }
 
+
+  @Override
+  protected long indexEntrySize() {
+    throw new IllegalStateException("Not supported by CompositeImmutableScanner");
+  }
+
+  @Override protected boolean canBeFlattened() {
+    return false;
+  }
+
   /**
    * @return Sum of all cell sizes.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/95405fbd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java
index 4a14e10..b035f1b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableMemStoreLAB.java
@@ -46,6 +46,15 @@ public class ImmutableMemStoreLAB implements MemStoreLAB {
   }
 
   @Override
+  // returning a new chunk, without replacing current chunk,
+  // the space on this chunk will be allocated externally
+  // use the first MemStoreLABImpl in the list
+  public Chunk getNewExternalChunk() {
+    MemStoreLAB mslab = this.mslabs.get(0);
+    return mslab.getNewExternalChunk();
+  }
+
+  @Override
   public void close() {
     // 'openScannerCount' here tracks the scanners opened on segments which directly refer to this
     // MSLAB. The individual MSLABs this refers also having its own 'openScannerCount'. The usage of

http://git-wip-us.apache.org/repos/asf/hbase/blob/95405fbd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
index c82573d..aacd189 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
@@ -19,14 +19,11 @@
 package org.apache.hadoop.hbase.regionserver;
 
 
-import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.io.TimeRange;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -36,21 +33,11 @@ import java.util.List;
  * and is not needed for a {@link MutableSegment}.
  */
 @InterfaceAudience.Private
-public class ImmutableSegment extends Segment {
+public abstract class ImmutableSegment extends Segment {
 
-  private static final long DEEP_OVERHEAD = Segment.DEEP_OVERHEAD
-      + (2 * ClassSize.REFERENCE) // Refs to timeRange and type
-      + ClassSize.TIMERANGE;
-  public static final long DEEP_OVERHEAD_CSLM = DEEP_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP;
-  public static final long DEEP_OVERHEAD_CAM = DEEP_OVERHEAD + ClassSize.CELL_ARRAY_MAP;
-
-  /**
-   * Types of ImmutableSegment
-   */
-  public enum Type {
-    SKIPLIST_MAP_BASED,
-    ARRAY_MAP_BASED,
-  }
+  public static final long DEEP_OVERHEAD = Segment.DEEP_OVERHEAD
+      + ClassSize.align(ClassSize.REFERENCE // Referent to timeRange
+      + ClassSize.TIMERANGE);
 
   /**
    * This is an immutable segment so use the read-only TimeRange rather than the heavy-weight
@@ -58,12 +45,8 @@ public class ImmutableSegment extends Segment {
    */
   private final TimeRange timeRange;
 
-  private Type type = Type.SKIPLIST_MAP_BASED;
-
-  // whether it is based on CellFlatMap or ConcurrentSkipListMap
-  private boolean isFlat(){
-    return (type != Type.SKIPLIST_MAP_BASED);
-  }
+  // each sub-type of immutable segment knows whether it is flat or not
+  protected abstract boolean canBeFlattened();
 
   /////////////////////  CONSTRUCTORS  /////////////////////
   /**------------------------------------------------------------------------
@@ -75,59 +58,25 @@ public class ImmutableSegment extends Segment {
   }
 
   /**------------------------------------------------------------------------
-   * Copy C-tor to be used when new ImmutableSegment is being built from a Mutable one.
-   * This C-tor should be used when active MutableSegment is pushed into the compaction
-   * pipeline and becomes an ImmutableSegment.
+   * C-tor to be used to build the derived classes
    */
-  protected ImmutableSegment(Segment segment) {
-    super(segment);
-    this.type = Type.SKIPLIST_MAP_BASED;
+  protected ImmutableSegment(CellSet cs, CellComparator comparator, MemStoreLAB memStoreLAB) {
+    super(cs, comparator, memStoreLAB);
     this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange();
   }
 
   /**------------------------------------------------------------------------
-   * C-tor to be used when new CELL_ARRAY BASED ImmutableSegment is a result of compaction of a
-   * list of older ImmutableSegments.
-   * The given iterator returns the Cells that "survived" the compaction.
-   * The input parameter "type" exists for future use when more types of flat ImmutableSegments
-   * are going to be introduced.
+   * Copy C-tor to be used when new CSLMImmutableSegment (derived) is being built from a Mutable one.
+   * This C-tor should be used when active MutableSegment is pushed into the compaction
+   * pipeline and becomes an ImmutableSegment.
    */
-  protected ImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator,
-      MemStoreLAB memStoreLAB, int numOfCells, Type type, boolean merge) {
-
-    super(null, // initiailize the CellSet with NULL
-        comparator, memStoreLAB);
-    this.type = type;
-    // build the new CellSet based on CellArrayMap
-    CellSet cs = createCellArrayMapSet(numOfCells, iterator, merge);
-
-    this.setCellSet(null, cs);            // update the CellSet of the new Segment
+  protected ImmutableSegment(Segment segment) {
+    super(segment);
     this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange();
   }
 
-  /**------------------------------------------------------------------------
-   * C-tor to be used when new SKIP-LIST BASED ImmutableSegment is a result of compaction of a
-   * list of older ImmutableSegments.
-   * The given iterator returns the Cells that "survived" the compaction.
-   */
-  protected ImmutableSegment(CellComparator comparator, MemStoreSegmentsIterator iterator,
-      MemStoreLAB memStoreLAB) {
-    super(new CellSet(comparator), // initiailize the CellSet with empty CellSet
-        comparator, memStoreLAB);
-    type = Type.SKIPLIST_MAP_BASED;
-    while (iterator.hasNext()) {
-      Cell c = iterator.next();
-      // The scanner is doing all the elimination logic
-      // now we just copy it to the new segment
-      Cell newKV = maybeCloneWithAllocator(c);
-      boolean usedMSLAB = (newKV != c);
-      internalAdd(newKV, usedMSLAB, null);
-    }
-    this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange();
-  }
 
   /////////////////////  PUBLIC METHODS  /////////////////////
-
   @Override
   public boolean shouldSeek(TimeRange tr, long oldestUnexpiredTS) {
     return this.timeRange.includesTimeRange(tr) &&
@@ -147,107 +96,4 @@ public class ImmutableSegment extends Segment {
     List<Segment> res = new ArrayList<>(Arrays.asList(this));
     return res;
   }
-
-  /**------------------------------------------------------------------------
-   * Change the CellSet of this ImmutableSegment from one based on ConcurrentSkipListMap to one
-   * based on CellArrayMap.
-   * If this ImmutableSegment is not based on ConcurrentSkipListMap , this is NOOP
-   *
-   * Synchronization of the CellSet replacement:
-   * The reference to the CellSet is AtomicReference and is updated only when ImmutableSegment
-   * is constructed (single thread) or flattened. The flattening happens as part of a single
-   * thread of compaction, but to be on the safe side the initial CellSet is locally saved
-   * before the flattening and then replaced using CAS instruction.
-   */
-  public boolean flatten(MemstoreSize memstoreSize) {
-    if (isFlat()) return false;
-    CellSet oldCellSet = getCellSet();
-    int numOfCells = getCellsCount();
-
-    // build the new (CellSet CellArrayMap based)
-    CellSet  newCellSet = recreateCellArrayMapSet(numOfCells);
-    type = Type.ARRAY_MAP_BASED;
-    setCellSet(oldCellSet,newCellSet);
-
-    // arrange the meta-data size, decrease all meta-data sizes related to SkipList
-    // (recreateCellArrayMapSet doesn't take the care for the sizes)
-    long newSegmentSizeDelta = -(numOfCells * ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
-    // add size of CellArrayMap and meta-data overhead per Cell
-    newSegmentSizeDelta = newSegmentSizeDelta + numOfCells * ClassSize.CELL_ARRAY_MAP_ENTRY;
-    incSize(0, newSegmentSizeDelta);
-    if (memstoreSize != null) {
-      memstoreSize.incMemstoreSize(0, newSegmentSizeDelta);
-    }
-
-    return true;
-  }
-
-  /////////////////////  PRIVATE METHODS  /////////////////////
-  /*------------------------------------------------------------------------*/
-  // Create CellSet based on CellArrayMap from compacting iterator
-  private CellSet createCellArrayMapSet(int numOfCells, MemStoreSegmentsIterator iterator,
-      boolean merge) {
-
-    Cell[] cells = new Cell[numOfCells];   // build the Cell Array
-    int i = 0;
-    while (iterator.hasNext()) {
-      Cell c = iterator.next();
-      // The scanner behind the iterator is doing all the elimination logic
-      if (merge) {
-        // if this is merge we just move the Cell object without copying MSLAB
-        // the sizes still need to be updated in the new segment
-        cells[i] = c;
-      } else {
-        // now we just copy it to the new segment (also MSLAB copy)
-        cells[i] = maybeCloneWithAllocator(c);
-      }
-      boolean useMSLAB = (getMemStoreLAB()!=null);
-      // second parameter true, because in compaction/merge the addition of the cell to new segment
-      // is always successful
-      updateMetaInfo(c, true, useMSLAB, null); // updates the size per cell
-      i++;
-    }
-    // build the immutable CellSet
-    CellArrayMap cam = new CellArrayMap(getComparator(), cells, 0, i, false);
-    return new CellSet(cam);
-  }
-
-  @Override
-  protected long heapSizeChange(Cell cell, boolean succ) {
-    if (succ) {
-      switch (this.type) {
-      case SKIPLIST_MAP_BASED:
-        return super.heapSizeChange(cell, succ);
-      case ARRAY_MAP_BASED:
-        return ClassSize.align(ClassSize.CELL_ARRAY_MAP_ENTRY + CellUtil.estimatedHeapSizeOf(cell));
-      }
-    }
-    return 0;
-  }
-
-  /*------------------------------------------------------------------------*/
-  // Create CellSet based on CellArrayMap from current ConcurrentSkipListMap based CellSet
-  // (without compacting iterator)
-  private CellSet recreateCellArrayMapSet(int numOfCells) {
-
-    Cell[] cells = new Cell[numOfCells];   // build the Cell Array
-    Cell curCell;
-    int idx = 0;
-    // create this segment scanner with maximal possible read point, to go over all Cells
-    KeyValueScanner segmentScanner = this.getScanner(Long.MAX_VALUE);
-
-    try {
-      while ((curCell = segmentScanner.next()) != null) {
-        cells[idx++] = curCell;
-      }
-    } catch (IOException ie) {
-      throw new IllegalStateException(ie);
-    } finally {
-      segmentScanner.close();
-    }
-
-    // build the immutable CellSet
-    CellArrayMap cam = new CellArrayMap(getComparator(), cells, 0, idx, false);
-    return new CellSet(cam);
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/95405fbd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
index 49a057c..410259d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java
@@ -80,7 +80,7 @@ public class MemStoreCompactor {
    * Note that every value covers the previous ones, i.e. if MERGE is the action it implies
    * that the youngest segment is going to be flatten anyway.
    */
-  private enum Action {
+  public enum Action {
     NOOP,
     FLATTEN,  // flatten the youngest segment in the pipeline
     MERGE,    // merge all the segments in the pipeline into one
@@ -160,7 +160,7 @@ public class MemStoreCompactor {
 
     if (action == Action.COMPACT) { // compact according to the user request
       LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName()
-          + " is going to be compacted, number of"
+          + " is going to be compacted to the " + compactingMemStore.getIndexType() + ". Number of"
           + " cells before compaction is " + versionedList.getNumOfCells());
       return Action.COMPACT;
     }
@@ -170,13 +170,15 @@ public class MemStoreCompactor {
     int numOfSegments = versionedList.getNumOfSegments();
     if (numOfSegments > pipelineThreshold) {
       LOG.debug("In-Memory Compaction Pipeline for store " + compactingMemStore.getFamilyName()
-          + " is going to be merged, as there are " + numOfSegments + " segments");
+          + " is going to be merged to the " + compactingMemStore.getIndexType()
+          + ", as there are " + numOfSegments + " segments");
       return Action.MERGE;          // to avoid too many segments, merge now
     }
 
     // if nothing of the above, then just flatten the newly joined segment
     LOG.debug("The youngest segment in the in-Memory Compaction Pipeline for store "
-        + compactingMemStore.getFamilyName() + " is going to be flattened");
+        + compactingMemStore.getFamilyName() + " is going to be flattened to the "
+        + compactingMemStore.getIndexType());
     return Action.FLATTEN;
   }
 
@@ -250,21 +252,22 @@ public class MemStoreCompactor {
 
         result = SegmentFactory.instance().createImmutableSegmentByCompaction(
           compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
-          versionedList.getNumOfCells(), ImmutableSegment.Type.ARRAY_MAP_BASED);
-        iterator.close();
-        break;
-      case MERGE:
-        iterator = new MemStoreMergerSegmentsIterator(versionedList.getStoreSegments(),
-            compactingMemStore.getComparator(), compactionKVMax);
+          versionedList.getNumOfCells(), compactingMemStore.getIndexType());
+      iterator.close();
+      break;
+    case MERGE:
+      iterator =
+          new MemStoreMergerSegmentsIterator(versionedList.getStoreSegments(),
+              compactingMemStore.getComparator(),
+              compactionKVMax);
 
         result = SegmentFactory.instance().createImmutableSegmentByMerge(
           compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
-          versionedList.getNumOfCells(), ImmutableSegment.Type.ARRAY_MAP_BASED,
-          versionedList.getStoreSegments());
-        iterator.close();
-        break;
-      default:
-        throw new RuntimeException("Unknown action " + action); // sanity check
+          versionedList.getNumOfCells(), versionedList.getStoreSegments(),
+          compactingMemStore.getIndexType());
+      iterator.close();
+      break;
+    default: throw new RuntimeException("Unknown action " + action); // sanity check
     }
 
     return result;

http://git-wip-us.apache.org/repos/asf/hbase/blob/95405fbd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
index da4efc8..24865e6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
@@ -83,6 +83,12 @@ public interface MemStoreLAB {
    */
   void decScannerCount();
 
+  /**
+   * Return a new empty chunk without considering this chunk as current
+   * The space on this chunk will be allocated externally
+   */
+  Chunk getNewExternalChunk();
+
   public static MemStoreLAB newInstance(Configuration conf) {
     MemStoreLAB memStoreLAB = null;
     if (isEnabled(conf)) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/95405fbd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
index 8d37aad..c7035ed 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
@@ -253,6 +253,17 @@ public class MemStoreLABImpl implements MemStoreLAB {
     return null;
   }
 
+  // Returning a new chunk, without replacing current chunk,
+  // meaning MSLABImpl does not make the returned chunk as CurChunk.
+  // The space on this chunk will be allocated externally
+  // The interface is only for external callers
+  @Override
+  public Chunk getNewExternalChunk() {
+    Chunk c = this.chunkCreator.getChunk();
+    chunks.add(c.getId());
+    return c;
+  }
+
   @VisibleForTesting
   Chunk getCurrentChunk() {
     return this.curChunk.get();

http://git-wip-us.apache.org/repos/asf/hbase/blob/95405fbd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
index 1db54c5..ea8dfde 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
@@ -42,6 +42,7 @@ public class MutableSegment extends Segment {
 
   protected MutableSegment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB) {
     super(cellSet, comparator, memStoreLAB);
+    incSize(0,DEEP_OVERHEAD); // update the mutable segment metadata
   }
 
   /**
@@ -121,4 +122,8 @@ public class MutableSegment extends Segment {
   public long getMinTimestamp() {
     return this.timeRangeTracker.getMin();
   }
+
+  @Override protected long indexEntrySize() {
+      return ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/95405fbd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
index fc78718..e4740ba 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
@@ -305,6 +305,10 @@ public abstract class Segment {
     }
   }
 
+  protected void updateMetaInfo(Cell cellToAdd, boolean succ, MemstoreSize memstoreSize) {
+    updateMetaInfo(cellToAdd, succ, (getMemStoreLAB()!=null), memstoreSize);
+  }
+
   /**
    * @return The increase in heap size because of this cell addition. This includes this cell POJO's
    *         heap size itself and additional overhead because of addition on to CSLM.
@@ -312,11 +316,13 @@ public abstract class Segment {
   protected long heapSizeChange(Cell cell, boolean succ) {
     if (succ) {
       return ClassSize
-          .align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + CellUtil.estimatedHeapSizeOf(cell));
+          .align(indexEntrySize() + CellUtil.estimatedHeapSizeOf(cell));
     }
     return 0;
   }
 
+  protected abstract long indexEntrySize();
+
   /**
    * Returns a subset of the segment cell set, which starts with the given cell
    * @param firstCell a cell in the segment

http://git-wip-us.apache.org/repos/asf/hbase/blob/95405fbd/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
index bd2cd29..3777f35 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java
@@ -18,7 +18,6 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -41,42 +40,36 @@ public final class SegmentFactory {
     return instance;
   }
 
-  // create skip-list-based (non-flat) immutable segment from compacting old immutable segments
-  public ImmutableSegment createImmutableSegment(final Configuration conf,
-      final CellComparator comparator, MemStoreSegmentsIterator iterator) {
-    return new ImmutableSegment(comparator, iterator, MemStoreLAB.newInstance(conf));
-  }
-
   // create composite immutable segment from a list of segments
+  // for snapshot consisting of multiple segments
   public CompositeImmutableSegment createCompositeImmutableSegment(
       final CellComparator comparator, List<ImmutableSegment> segments) {
     return new CompositeImmutableSegment(comparator, segments);
-
   }
 
   // create new flat immutable segment from compacting old immutable segments
+  // for compaction
   public ImmutableSegment createImmutableSegmentByCompaction(final Configuration conf,
       final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells,
-      ImmutableSegment.Type segmentType)
+      CompactingMemStore.IndexType idxType)
       throws IOException {
-    Preconditions.checkArgument(segmentType == ImmutableSegment.Type.ARRAY_MAP_BASED,
-        "wrong immutable segment type");
+
     MemStoreLAB memStoreLAB = MemStoreLAB.newInstance(conf);
     return
-        // the last parameter "false" means not to merge, but to compact the pipeline
-        // in order to create the new segment
-        new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, segmentType, false);
+        createImmutableSegment(
+            conf,comparator,iterator,memStoreLAB,numOfCells,MemStoreCompactor.Action.COMPACT,idxType);
   }
 
   // create empty immutable segment
+  // for initializations
   public ImmutableSegment createImmutableSegment(CellComparator comparator) {
     MutableSegment segment = generateMutableSegment(null, comparator, null);
     return createImmutableSegment(segment);
   }
 
-  // create immutable segment from mutable segment
+  // create not-flat immutable segment from mutable segment
   public ImmutableSegment createImmutableSegment(MutableSegment segment) {
-    return new ImmutableSegment(segment);
+    return new CSLMImmutableSegment(segment);
   }
 
   // create mutable segment
@@ -86,19 +79,58 @@ public final class SegmentFactory {
   }
 
   // create new flat immutable segment from merging old immutable segments
+  // for merge
   public ImmutableSegment createImmutableSegmentByMerge(final Configuration conf,
       final CellComparator comparator, MemStoreSegmentsIterator iterator, int numOfCells,
-      ImmutableSegment.Type segmentType, List<ImmutableSegment> segments)
+      List<ImmutableSegment> segments, CompactingMemStore.IndexType idxType)
       throws IOException {
-    Preconditions.checkArgument(segmentType == ImmutableSegment.Type.ARRAY_MAP_BASED,
-        "wrong immutable segment type");
+
     MemStoreLAB memStoreLAB = getMergedMemStoreLAB(conf, segments);
     return
-        // the last parameter "true" means to merge the compaction pipeline
-        // in order to create the new segment
-        new ImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, segmentType, true);
+        createImmutableSegment(
+            conf,comparator,iterator,memStoreLAB,numOfCells,MemStoreCompactor.Action.MERGE,idxType);
+
+  }
+
+  // create flat immutable segment from non-flat immutable segment
+  // for flattening
+  public ImmutableSegment createImmutableSegmentByFlattening(
+      CSLMImmutableSegment segment, CompactingMemStore.IndexType idxType, MemstoreSize memstoreSize) {
+    ImmutableSegment res = null;
+    switch (idxType) {
+    case CHUNK_MAP:
+      res = new CellChunkImmutableSegment(segment, memstoreSize);
+      break;
+    case CSLM_MAP:
+      assert false; // non-flat segment can not be the result of flattening
+      break;
+    case ARRAY_MAP:
+      res = new CellArrayImmutableSegment(segment, memstoreSize);
+      break;
+    }
+    return res;
   }
+
+
   //****** private methods to instantiate concrete store segments **********//
+  private ImmutableSegment createImmutableSegment(final Configuration conf, final CellComparator comparator,
+      MemStoreSegmentsIterator iterator, MemStoreLAB memStoreLAB, int numOfCells,
+      MemStoreCompactor.Action action, CompactingMemStore.IndexType idxType) {
+
+    ImmutableSegment res = null;
+    switch (idxType) {
+    case CHUNK_MAP:
+      res = new CellChunkImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, action);
+      break;
+    case CSLM_MAP:
+      assert false; // non-flat segment can not be created here
+      break;
+    case ARRAY_MAP:
+      res = new CellArrayImmutableSegment(comparator, iterator, memStoreLAB, numOfCells, action);
+      break;
+    }
+    return res;
+  }
 
   private MutableSegment generateMutableSegment(final Configuration conf, CellComparator comparator,
       MemStoreLAB memStoreLAB) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/95405fbd/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
index 8a6e19b..8ef666d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
@@ -369,6 +369,7 @@ public class TestHeapSize  {
     if (expected != actual) {
       ClassSize.estimateBase(cl, true);
       ClassSize.estimateBase(AtomicLong.class, true);
+      ClassSize.estimateBase(AtomicLong.class, true);
       ClassSize.estimateBase(AtomicReference.class, true);
       ClassSize.estimateBase(CellSet.class, true);
       ClassSize.estimateBase(TimeRangeTracker.class, true);
@@ -376,9 +377,28 @@ public class TestHeapSize  {
       assertEquals(expected, actual);
     }
 
-    // ImmutableSegment Deep overhead
+    // ImmutableSegments Deep overhead
     cl = ImmutableSegment.class;
-    actual = ImmutableSegment.DEEP_OVERHEAD_CSLM;
+    actual = ImmutableSegment.DEEP_OVERHEAD;
+    expected = ClassSize.estimateBase(cl, false);
+    expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
+    expected += ClassSize.estimateBase(AtomicReference.class, false);
+    expected += ClassSize.estimateBase(CellSet.class, false);
+    expected += ClassSize.estimateBase(TimeRangeTracker.class, false);
+    expected += ClassSize.estimateBase(TimeRange.class, false);
+    if (expected != actual) {
+      ClassSize.estimateBase(cl, true);
+      ClassSize.estimateBase(AtomicLong.class, true);
+      ClassSize.estimateBase(AtomicLong.class, true);
+      ClassSize.estimateBase(AtomicReference.class, true);
+      ClassSize.estimateBase(CellSet.class, true);
+      ClassSize.estimateBase(TimeRangeTracker.class, true);
+      ClassSize.estimateBase(TimeRange.class, true);
+      assertEquals(expected, actual);
+    }
+
+    cl = CSLMImmutableSegment.class;
+    actual = CSLMImmutableSegment.DEEP_OVERHEAD_CSLM;
     expected = ClassSize.estimateBase(cl, false);
     expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
     expected += ClassSize.estimateBase(AtomicReference.class, false);
@@ -389,6 +409,7 @@ public class TestHeapSize  {
     if (expected != actual) {
       ClassSize.estimateBase(cl, true);
       ClassSize.estimateBase(AtomicLong.class, true);
+      ClassSize.estimateBase(AtomicLong.class, true);
       ClassSize.estimateBase(AtomicReference.class, true);
       ClassSize.estimateBase(CellSet.class, true);
       ClassSize.estimateBase(TimeRangeTracker.class, true);
@@ -396,7 +417,8 @@ public class TestHeapSize  {
       ClassSize.estimateBase(ConcurrentSkipListMap.class, true);
       assertEquals(expected, actual);
     }
-    actual = ImmutableSegment.DEEP_OVERHEAD_CAM;
+    cl = CellArrayImmutableSegment.class;
+    actual = CellArrayImmutableSegment.DEEP_OVERHEAD_CAM;
     expected = ClassSize.estimateBase(cl, false);
     expected += 2 * ClassSize.estimateBase(AtomicLong.class, false);
     expected += ClassSize.estimateBase(AtomicReference.class, false);
@@ -407,6 +429,7 @@ public class TestHeapSize  {
     if (expected != actual) {
       ClassSize.estimateBase(cl, true);
       ClassSize.estimateBase(AtomicLong.class, true);
+      ClassSize.estimateBase(AtomicLong.class, true);
       ClassSize.estimateBase(AtomicReference.class, true);
       ClassSize.estimateBase(CellSet.class, true);
       ClassSize.estimateBase(TimeRangeTracker.class, true);

http://git-wip-us.apache.org/repos/asf/hbase/blob/95405fbd/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
index 5872d69..6307d32 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellFlatSet.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -286,8 +287,8 @@ public class TestCellFlatSet extends TestCase {
 
     ByteBuffer idxBuffer = idxChunk.getData();  // the buffers of the chunks
     ByteBuffer dataBuffer = dataChunk.getData();
-    int dataOffset = Bytes.SIZEOF_INT;          // offset inside data buffer
-    int idxOffset = Bytes.SIZEOF_INT;           // skip the space for chunk ID
+    int dataOffset = ChunkCreator.SIZEOF_CHUNK_HEADER;          // offset inside data buffer
+    int idxOffset = ChunkCreator.SIZEOF_CHUNK_HEADER;           // skip the space for chunk ID
 
     Cell[] cellArray = asc ? ascCells : descCells;
 
@@ -296,16 +297,16 @@ public class TestCellFlatSet extends TestCase {
       if (dataOffset + KeyValueUtil.length(kv) > chunkCreator.getChunkSize()) {
         dataChunk = chunkCreator.getChunk();    // allocate more data chunks if needed
         dataBuffer = dataChunk.getData();
-        dataOffset = Bytes.SIZEOF_INT;
+        dataOffset = ChunkCreator.SIZEOF_CHUNK_HEADER;
       }
       int dataStartOfset = dataOffset;
       dataOffset = KeyValueUtil.appendTo(kv, dataBuffer, dataOffset, false); // write deep cell data
 
       // do we have enough space to write the cell-representation on the index chunk?
-      if (idxOffset + CellChunkMap.SIZEOF_CELL_REP > chunkCreator.getChunkSize()) {
+      if (idxOffset + ClassSize.CELL_CHUNK_MAP_ENTRY > chunkCreator.getChunkSize()) {
         idxChunk = chunkCreator.getChunk();    // allocate more index chunks if needed
         idxBuffer = idxChunk.getData();
-        idxOffset = Bytes.SIZEOF_INT;
+        idxOffset = ChunkCreator.SIZEOF_CHUNK_HEADER;
         chunkArray[chunkArrayIdx++] = idxChunk;
       }
       idxOffset = ByteBufferUtils.putInt(idxBuffer, idxOffset, dataChunk.getId()); // write data chunk id
@@ -314,8 +315,6 @@ public class TestCellFlatSet extends TestCase {
       idxOffset = ByteBufferUtils.putLong(idxBuffer, idxOffset, kv.getSequenceId());     // seqId
     }
 
-    return asc ?
-        new CellChunkMap(CellComparator.COMPARATOR,chunkArray,0,NUM_OF_CELLS,false) :
-        new CellChunkMap(CellComparator.COMPARATOR,chunkArray,0,NUM_OF_CELLS,true);
+    return new CellChunkMap(CellComparator.COMPARATOR,chunkArray,0,NUM_OF_CELLS,!asc);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/95405fbd/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
index c5570e7..dc3cf4d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.EnvironmentEdge;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Threads;
@@ -98,7 +99,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
         .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
     chunkCreator = ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false,
-      globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
+        globalMemStoreLimit, 0.4f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
     assertTrue(chunkCreator != null);
   }
 
@@ -563,16 +564,71 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     assertTrue(chunkCreator.getPoolSize() > 0);
   }
 
+  @Test
+  public void testFlatteningToCellChunkMap() throws IOException {
+
+    // set memstore to flat into CellChunkMap
+    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
+    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
+        String.valueOf(compactionType));
+    ((CompactingMemStore)memstore).initiateType(compactionType);
+    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
+        String.valueOf(CompactingMemStore.IndexType.CHUNK_MAP));
+    ((CompactingMemStore)memstore).setIndexType();
+    int numOfCells = 8;
+    String[] keys1 = { "A", "A", "B", "C", "D", "D", "E", "F" }; //A1, A2, B3, C4, D5, D6, E7, F8
+
+    // make one cell
+    byte[] row = Bytes.toBytes(keys1[0]);
+    byte[] val = Bytes.toBytes(keys1[0] + 0);
+    KeyValue kv =
+        new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
+            System.currentTimeMillis(), val);
+
+    // test 1 bucket
+    int totalCellsLen = addRowsByKeys(memstore, keys1);
+    long oneCellOnCSLMHeapSize =
+        ClassSize.align(
+            ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD + KeyValueUtil
+                .length(kv));
+
+    long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD;
+    assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
+
+    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and flatten
+    assertEquals(0, memstore.getSnapshot().getCellsCount());
+    // One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode.
+    // totalCellsLen should remain the same
+    long oneCellOnCCMHeapSize =
+        ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv));
+    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
+        + numOfCells * oneCellOnCCMHeapSize;
+
+    assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
+
+    MemstoreSize size = memstore.getFlushableSize();
+    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
+    region.decrMemstoreSize(size);  // simulate flusher
+    ImmutableSegment s = memstore.getSnapshot();
+    assertEquals(numOfCells, s.getCellsCount());
+    assertEquals(0, regionServicesForStores.getMemstoreSize());
+
+    memstore.clearSnapshot(snapshot.getId());
+  }
+
   //////////////////////////////////////////////////////////////////////////////
   // Compaction tests
   //////////////////////////////////////////////////////////////////////////////
   @Test
   public void testCompaction1Bucket() throws IOException {
 
-    // set memstore to do data compaction and not to use the speculative scan
-    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER;
-    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
-        String.valueOf(compactionType));
+    // set memstore to do basic structure flattening, the "eager" option is tested in
+    // TestCompactingToCellFlatMapMemStore
+    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
+    memstore.getConfiguration()
+        .set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(compactionType));
     ((CompactingMemStore)memstore).initiateType(compactionType);
 
     String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4
@@ -581,25 +637,24 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     int totalCellsLen = addRowsByKeys(memstore, keys1);
     int oneCellOnCSLMHeapSize = 120;
     int oneCellOnCAHeapSize = 88;
-    long totalHeapSize = 4 * oneCellOnCSLMHeapSize;
+    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
     assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
     assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
 
-    MemstoreSize size = memstore.getFlushableSize();
     ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
-    // totalCellsLen
-    totalCellsLen = (totalCellsLen * 3) / 4;
-    totalHeapSize = 3 * oneCellOnCAHeapSize;
+    // There is no compaction, as the compacting memstore type is basic.
+    // totalCellsLen remains the same
+    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
+        + 4 * oneCellOnCAHeapSize;
     assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
     assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
 
-    size = memstore.getFlushableSize();
+    MemstoreSize size = memstore.getFlushableSize();
     MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
     region.decrMemstoreSize(size);  // simulate flusher
     ImmutableSegment s = memstore.getSnapshot();
-    assertEquals(3, s.getCellsCount());
+    assertEquals(4, s.getCellsCount());
     assertEquals(0, regionServicesForStores.getMemstoreSize());
 
     memstore.clearSnapshot(snapshot.getId());
@@ -608,8 +663,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
   @Test
   public void testCompaction2Buckets() throws IOException {
 
-    // set memstore to do data compaction and not to use the speculative scan
-    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER;
+    // set memstore to do basic structure flattening, the "eager" option is tested in
+    // TestCompactingToCellFlatMapMemStore
+    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
     memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
         String.valueOf(compactionType));
     ((CompactingMemStore)memstore).initiateType(compactionType);
@@ -619,44 +675,43 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     int totalCellsLen1 = addRowsByKeys(memstore, keys1);
     int oneCellOnCSLMHeapSize = 120;
     int oneCellOnCAHeapSize = 88;
-    long totalHeapSize = 4 * oneCellOnCSLMHeapSize;
+    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
 
     assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
     assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
 
-    MemstoreSize size = memstore.getFlushableSize();
     ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
     int counter = 0;
     for ( Segment s : memstore.getSegments()) {
       counter += s.getCellsCount();
     }
-    assertEquals(3, counter);
+    assertEquals(4, counter);
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    // One cell is duplicated and the compaction will remove it. All cells of same time so adjusting
-    // totalCellsLen
-    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
+    // There is no compaction, as the compacting memstore type is basic.
+    // totalCellsLen remains the same
     assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
-    totalHeapSize = 3 * oneCellOnCAHeapSize;
+    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
+        + 4 * oneCellOnCAHeapSize;
     assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
 
     int totalCellsLen2 = addRowsByKeys(memstore, keys2);
     totalHeapSize += 3 * oneCellOnCSLMHeapSize;
     assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
-    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
+    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
 
-    size = memstore.getFlushableSize();
+    MemstoreSize size = memstore.getFlushableSize();
     ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    totalCellsLen2 = totalCellsLen2 / 3;// 2 cells duplicated in set 2
     assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
-    totalHeapSize = 4 * oneCellOnCAHeapSize;
+    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
+        + 7 * oneCellOnCAHeapSize;
     assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
 
     size = memstore.getFlushableSize();
     MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
     region.decrMemstoreSize(size);  // simulate flusher
     ImmutableSegment s = memstore.getSnapshot();
-    assertEquals(4, s.getCellsCount());
+    assertEquals(7, s.getCellsCount());
     assertEquals(0, regionServicesForStores.getMemstoreSize());
 
     memstore.clearSnapshot(snapshot.getId());
@@ -678,10 +733,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     int oneCellOnCSLMHeapSize = 120;
     int oneCellOnCAHeapSize = 88;
     assertEquals(totalCellsLen1, region.getMemstoreSize());
-    long totalHeapSize = 4 * oneCellOnCSLMHeapSize;
+    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
     assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
-
-    MemstoreSize size = memstore.getFlushableSize();
     ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
 
     assertEquals(0, memstore.getSnapshot().getCellsCount());
@@ -690,29 +743,31 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     totalCellsLen1 = (totalCellsLen1 * 3) / 4;
     assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
     // In memory flush to make a CellArrayMap instead of CSLM. See the overhead diff.
-    totalHeapSize = 3 * oneCellOnCAHeapSize;
+    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
+        + 3 * oneCellOnCAHeapSize;
     assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
 
     int totalCellsLen2 = addRowsByKeys(memstore, keys2);// Adding 3 more cells.
-    long totalHeapSize2 = 3 * oneCellOnCSLMHeapSize;
+    long totalHeapSize2 = totalHeapSize + 3 * oneCellOnCSLMHeapSize;
 
     assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
-    assertEquals(totalHeapSize + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
+    assertEquals(totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
 
     ((CompactingMemStore) memstore).disableCompaction();
-    size = memstore.getFlushableSize();
+    MemstoreSize size = memstore.getFlushableSize();
     ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction
     assertEquals(0, memstore.getSnapshot().getCellsCount());
     // No change in the cells data size. ie. memstore size. as there is no compaction.
     assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
-    assertEquals(totalHeapSize + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
+    assertEquals(totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM,
+        ((CompactingMemStore) memstore).heapSize());
 
     int totalCellsLen3 = addRowsByKeys(memstore, keys3);// 3 more cells added
     assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
         regionServicesForStores.getMemstoreSize());
-    long totalHeapSize3 = 3 * oneCellOnCSLMHeapSize;
-    assertEquals(totalHeapSize + totalHeapSize2 + totalHeapSize3,
-        ((CompactingMemStore) memstore).heapSize());
+    long totalHeapSize3 = totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
+        + 3 * oneCellOnCSLMHeapSize;
+    assertEquals(totalHeapSize3, ((CompactingMemStore) memstore).heapSize());
 
     ((CompactingMemStore)memstore).enableCompaction();
     size = memstore.getFlushableSize();
@@ -725,7 +780,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
         regionServicesForStores.getMemstoreSize());
     // Only 4 unique cells left
-    assertEquals(4 * oneCellOnCAHeapSize, ((CompactingMemStore) memstore).heapSize());
+    assertEquals(4 * oneCellOnCAHeapSize + MutableSegment.DEEP_OVERHEAD
+        + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, ((CompactingMemStore) memstore).heapSize());
 
     size = memstore.getFlushableSize();
     MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot

http://git-wip-us.apache.org/repos/asf/hbase/blob/95405fbd/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
deleted file mode 100644
index 66e107a..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
+++ /dev/null
@@ -1,492 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.*;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.RegionServerTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Threads;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import java.io.IOException;
-import java.util.List;
-
-
-
-/**
- * compacted memstore test case
- */
-@Category({RegionServerTests.class, MediumTests.class})
-public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore {
-
-  private static final Log LOG = LogFactory.getLog(TestCompactingToCellArrayMapMemStore.class);
-
-  //////////////////////////////////////////////////////////////////////////////
-  // Helpers
-  //////////////////////////////////////////////////////////////////////////////
-
-  @Override public void tearDown() throws Exception {
-    chunkCreator.clearChunksInPool();
-  }
-
-  @Override public void setUp() throws Exception {
-    compactingSetUp();
-    Configuration conf = HBaseConfiguration.create();
-
-    // set memstore to do data compaction
-    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
-        String.valueOf(MemoryCompactionPolicy.EAGER));
-
-    this.memstore =
-        new CompactingMemStore(conf, CellComparator.COMPARATOR, store,
-            regionServicesForStores, MemoryCompactionPolicy.EAGER);
-  }
-
-  //////////////////////////////////////////////////////////////////////////////
-  // Compaction tests
-  //////////////////////////////////////////////////////////////////////////////
-  public void testCompaction1Bucket() throws IOException {
-    int counter = 0;
-    String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4
-
-    // test 1 bucket
-    long totalCellsLen = addRowsByKeys(memstore, keys1);
-    int oneCellOnCSLMHeapSize = 120;
-    int oneCellOnCAHeapSize = 88;
-    long totalHeapSize = 4 * oneCellOnCSLMHeapSize;
-    assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
-    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
-
-    assertEquals(4, memstore.getActive().getCellsCount());
-    MemstoreSize size = memstore.getFlushableSize();
-    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
-    assertEquals(0, memstore.getSnapshot().getCellsCount());
-    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
-    // totalCellsLen
-    totalCellsLen = (totalCellsLen * 3) / 4;
-    assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
-    totalHeapSize = 3 * oneCellOnCAHeapSize;
-    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
-    for ( Segment s : memstore.getSegments()) {
-      counter += s.getCellsCount();
-    }
-    assertEquals(3, counter);
-    size = memstore.getFlushableSize();
-    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
-    region.decrMemstoreSize(size);  // simulate flusher
-    ImmutableSegment s = memstore.getSnapshot();
-    assertEquals(3, s.getCellsCount());
-    assertEquals(0, regionServicesForStores.getMemstoreSize());
-
-    memstore.clearSnapshot(snapshot.getId());
-  }
-
-  public void testCompaction2Buckets() throws IOException {
-
-    String[] keys1 = { "A", "A", "B", "C" };
-    String[] keys2 = { "A", "B", "D" };
-
-    long totalCellsLen1 = addRowsByKeys(memstore, keys1);
-    int oneCellOnCSLMHeapSize = 120;
-    int oneCellOnCAHeapSize = 88;
-    long totalHeapSize1 = 4 * oneCellOnCSLMHeapSize;
-    assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
-    assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
-    MemstoreSize size = memstore.getFlushableSize();
-
-    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
-    int counter = 0;
-    for ( Segment s : memstore.getSegments()) {
-      counter += s.getCellsCount();
-    }
-    assertEquals(3,counter);
-    assertEquals(0, memstore.getSnapshot().getCellsCount());
-    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
-    // totalCellsLen
-    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
-    totalHeapSize1 = 3 * oneCellOnCAHeapSize;
-    assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
-    assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
-
-    long totalCellsLen2 = addRowsByKeys(memstore, keys2);
-    long totalHeapSize2 = 3 * oneCellOnCSLMHeapSize;
-    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
-    assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
-
-    size = memstore.getFlushableSize();
-    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
-    assertEquals(0, memstore.getSnapshot().getCellsCount());
-    counter = 0;
-    for ( Segment s : memstore.getSegments()) {
-      counter += s.getCellsCount();
-    }
-    assertEquals(4,counter);
-    totalCellsLen2 = totalCellsLen2 / 3;// 2 cells duplicated in set 2
-    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
-    totalHeapSize2 = 1 * oneCellOnCAHeapSize;
-    assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
-
-    size = memstore.getFlushableSize();
-    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
-    region.decrMemstoreSize(size);  // simulate flusher
-    ImmutableSegment s = memstore.getSnapshot();
-    assertEquals(4, s.getCellsCount());
-    assertEquals(0, regionServicesForStores.getMemstoreSize());
-
-    memstore.clearSnapshot(snapshot.getId());
-  }
-
-  public void testCompaction3Buckets() throws IOException {
-
-    String[] keys1 = { "A", "A", "B", "C" };
-    String[] keys2 = { "A", "B", "D" };
-    String[] keys3 = { "D", "B", "B" };
-
-    long totalCellsLen1 = addRowsByKeys(memstore, keys1);
-    int oneCellOnCSLMHeapSize = 120;
-    int oneCellOnCAHeapSize = 88;
-    long totalHeapSize1 = 4 * oneCellOnCSLMHeapSize;
-    assertEquals(totalCellsLen1, region.getMemstoreSize());
-    assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
-
-    MemstoreSize size = memstore.getFlushableSize();
-    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
-
-    assertEquals(0, memstore.getSnapshot().getCellsCount());
-    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
-    // totalCellsLen
-    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
-    totalHeapSize1 = 3 * oneCellOnCAHeapSize;
-    assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
-    assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
-
-    long totalCellsLen2 = addRowsByKeys(memstore, keys2);
-    long totalHeapSize2 = 3 * oneCellOnCSLMHeapSize;
-
-    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
-    assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
-
-    ((CompactingMemStore) memstore).disableCompaction();
-    size = memstore.getFlushableSize();
-    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
-    assertEquals(0, memstore.getSnapshot().getCellsCount());
-    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
-    assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
-
-    long totalCellsLen3 = addRowsByKeys(memstore, keys3);
-    long totalHeapSize3 = 3 * oneCellOnCSLMHeapSize;
-    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
-        regionServicesForStores.getMemstoreSize());
-    assertEquals(totalHeapSize1 + totalHeapSize2 + totalHeapSize3,
-        ((CompactingMemStore) memstore).heapSize());
-
-    ((CompactingMemStore) memstore).enableCompaction();
-    size = memstore.getFlushableSize();
-    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
-    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
-      Threads.sleep(10);
-    }
-    assertEquals(0, memstore.getSnapshot().getCellsCount());
-    // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells.
-    // Out of total 10, only 4 cells are unique
-    totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated
-    totalCellsLen3 = 0;// All duplicated cells.
-    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
-        regionServicesForStores.getMemstoreSize());
-    // Only 4 unique cells left
-    assertEquals(4 * oneCellOnCAHeapSize, ((CompactingMemStore) memstore).heapSize());
-
-    size = memstore.getFlushableSize();
-    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
-    region.decrMemstoreSize(size);  // simulate flusher
-    ImmutableSegment s = memstore.getSnapshot();
-    assertEquals(4, s.getCellsCount());
-    assertEquals(0, regionServicesForStores.getMemstoreSize());
-
-    memstore.clearSnapshot(snapshot.getId());
-
-  }
-
-  //////////////////////////////////////////////////////////////////////////////
-  // Merging tests
-  //////////////////////////////////////////////////////////////////////////////
-  @Test
-  public void testMerging() throws IOException {
-
-    String[] keys1 = { "A", "A", "B", "C", "F", "H"};
-    String[] keys2 = { "A", "B", "D", "G", "I", "J"};
-    String[] keys3 = { "D", "B", "B", "E" };
-
-    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
-    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
-        String.valueOf(compactionType));
-    ((CompactingMemStore)memstore).initiateType(compactionType);
-    addRowsByKeys(memstore, keys1);
-
-    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline should not compact
-
-    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
-      Threads.sleep(10);
-    }
-    assertEquals(0, memstore.getSnapshot().getCellsCount());
-
-    addRowsByKeys(memstore, keys2); // also should only flatten
-
-    int counter2 = 0;
-    for ( Segment s : memstore.getSegments()) {
-      counter2 += s.getCellsCount();
-    }
-    assertEquals(12, counter2);
-
-    ((CompactingMemStore) memstore).disableCompaction();
-
-    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without flattening
-    assertEquals(0, memstore.getSnapshot().getCellsCount());
-
-    int counter3 = 0;
-    for ( Segment s : memstore.getSegments()) {
-      counter3 += s.getCellsCount();
-    }
-    assertEquals(12, counter3);
-
-    addRowsByKeys(memstore, keys3);
-
-    int counter4 = 0;
-    for ( Segment s : memstore.getSegments()) {
-      counter4 += s.getCellsCount();
-    }
-    assertEquals(16, counter4);
-
-    ((CompactingMemStore) memstore).enableCompaction();
-
-
-    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
-    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
-      Threads.sleep(10);
-    }
-    assertEquals(0, memstore.getSnapshot().getCellsCount());
-
-    int counter = 0;
-    for ( Segment s : memstore.getSegments()) {
-      counter += s.getCellsCount();
-    }
-    assertEquals(16,counter);
-
-    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
-    ImmutableSegment s = memstore.getSnapshot();
-    memstore.clearSnapshot(snapshot.getId());
-  }
-
-  @Test
-  public void testCountOfCellsAfterFlatteningByScan() throws IOException {
-    String[] keys1 = { "A", "B", "C" }; // A, B, C
-    addRowsByKeysWith50Cols(memstore, keys1);
-    // this should only flatten as there are no duplicates
-    ((CompactingMemStore) memstore).flushInMemory();
-    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
-      Threads.sleep(10);
-    }
-    List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE);
-    // seek
-    int count = 0;
-    for(int i = 0; i < scanners.size(); i++) {
-      scanners.get(i).seek(KeyValue.LOWESTKEY);
-      while (scanners.get(i).next() != null) {
-        count++;
-      }
-    }
-    assertEquals("the count should be ", count, 150);
-    for(int i = 0; i < scanners.size(); i++) {
-      scanners.get(i).close();
-    }
-  }
-
-  @Test
-  public void testCountOfCellsAfterFlatteningByIterator() throws IOException {
-    String[] keys1 = { "A", "B", "C" }; // A, B, C
-    addRowsByKeysWith50Cols(memstore, keys1);
-    // this should only flatten as there are no duplicates
-    ((CompactingMemStore) memstore).flushInMemory();
-    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
-      Threads.sleep(10);
-    }
-    // Just doing the cnt operation here
-    MemStoreSegmentsIterator itr = new MemStoreMergerSegmentsIterator(
-        ((CompactingMemStore) memstore).getImmutableSegments().getStoreSegments(),
-        CellComparator.COMPARATOR, 10);
-    int cnt = 0;
-    try {
-      while (itr.next() != null) {
-        cnt++;
-      }
-    } finally {
-      itr.close();
-    }
-    assertEquals("the count should be ", cnt, 150);
-  }
-
-
-  private void addRowsByKeysWith50Cols(AbstractMemStore hmc, String[] keys) {
-    byte[] fam = Bytes.toBytes("testfamily");
-    for (int i = 0; i < keys.length; i++) {
-      long timestamp = System.currentTimeMillis();
-      Threads.sleep(1); // to make sure each kv gets a different ts
-      byte[] row = Bytes.toBytes(keys[i]);
-      for(int  j =0 ;j < 50; j++) {
-        byte[] qf = Bytes.toBytes("testqualifier"+j);
-        byte[] val = Bytes.toBytes(keys[i] + j);
-        KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
-        hmc.add(kv, null);
-      }
-    }
-  }
-
-  @Override
-  @Test
-  public void testPuttingBackChunksWithOpeningScanner() throws IOException {
-    byte[] row = Bytes.toBytes("testrow");
-    byte[] fam = Bytes.toBytes("testfamily");
-    byte[] qf1 = Bytes.toBytes("testqualifier1");
-    byte[] qf2 = Bytes.toBytes("testqualifier2");
-    byte[] qf3 = Bytes.toBytes("testqualifier3");
-    byte[] qf4 = Bytes.toBytes("testqualifier4");
-    byte[] qf5 = Bytes.toBytes("testqualifier5");
-    byte[] qf6 = Bytes.toBytes("testqualifier6");
-    byte[] qf7 = Bytes.toBytes("testqualifier7");
-    byte[] val = Bytes.toBytes("testval");
-
-    // Setting up memstore
-    memstore.add(new KeyValue(row, fam, qf1, val), null);
-    memstore.add(new KeyValue(row, fam, qf2, val), null);
-    memstore.add(new KeyValue(row, fam, qf3, val), null);
-
-    // Creating a snapshot
-    MemStoreSnapshot snapshot = memstore.snapshot();
-    assertEquals(3, memstore.getSnapshot().getCellsCount());
-
-    // Adding value to "new" memstore
-    assertEquals(0, memstore.getActive().getCellsCount());
-    memstore.add(new KeyValue(row, fam, qf4, val), null);
-    memstore.add(new KeyValue(row, fam, qf5, val), null);
-    assertEquals(2, memstore.getActive().getCellsCount());
-
-    // opening scanner before clear the snapshot
-    List<KeyValueScanner> scanners = memstore.getScanners(0);
-    // Shouldn't putting back the chunks to pool,since some scanners are opening
-    // based on their data
-    // close the scanners
-    for(KeyValueScanner scanner : snapshot.getScanners()) {
-      scanner.close();
-    }
-    memstore.clearSnapshot(snapshot.getId());
-
-    assertTrue(chunkCreator.getPoolSize() == 0);
-
-    // Chunks will be put back to pool after close scanners;
-    for (KeyValueScanner scanner : scanners) {
-      scanner.close();
-    }
-    assertTrue(chunkCreator.getPoolSize() > 0);
-
-    // clear chunks
-    chunkCreator.clearChunksInPool();
-
-    // Creating another snapshot
-
-    snapshot = memstore.snapshot();
-    // Adding more value
-    memstore.add(new KeyValue(row, fam, qf6, val), null);
-    memstore.add(new KeyValue(row, fam, qf7, val), null);
-    // opening scanners
-    scanners = memstore.getScanners(0);
-    // close scanners before clear the snapshot
-    for (KeyValueScanner scanner : scanners) {
-      scanner.close();
-    }
-    // Since no opening scanner, the chunks of snapshot should be put back to
-    // pool
-    // close the scanners
-    for(KeyValueScanner scanner : snapshot.getScanners()) {
-      scanner.close();
-    }
-    memstore.clearSnapshot(snapshot.getId());
-    assertTrue(chunkCreator.getPoolSize() > 0);
-  }
-
-  @Test
-  public void testPuttingBackChunksAfterFlushing() throws IOException {
-    byte[] row = Bytes.toBytes("testrow");
-    byte[] fam = Bytes.toBytes("testfamily");
-    byte[] qf1 = Bytes.toBytes("testqualifier1");
-    byte[] qf2 = Bytes.toBytes("testqualifier2");
-    byte[] qf3 = Bytes.toBytes("testqualifier3");
-    byte[] qf4 = Bytes.toBytes("testqualifier4");
-    byte[] qf5 = Bytes.toBytes("testqualifier5");
-    byte[] val = Bytes.toBytes("testval");
-
-    // Setting up memstore
-    memstore.add(new KeyValue(row, fam, qf1, val), null);
-    memstore.add(new KeyValue(row, fam, qf2, val), null);
-    memstore.add(new KeyValue(row, fam, qf3, val), null);
-
-    // Creating a snapshot
-    MemStoreSnapshot snapshot = memstore.snapshot();
-    assertEquals(3, memstore.getSnapshot().getCellsCount());
-
-    // Adding value to "new" memstore
-    assertEquals(0, memstore.getActive().getCellsCount());
-    memstore.add(new KeyValue(row, fam, qf4, val), null);
-    memstore.add(new KeyValue(row, fam, qf5, val), null);
-    assertEquals(2, memstore.getActive().getCellsCount());
-    // close the scanners
-    for(KeyValueScanner scanner : snapshot.getScanners()) {
-      scanner.close();
-    }
-    memstore.clearSnapshot(snapshot.getId());
-
-    int chunkCount = chunkCreator.getPoolSize();
-    assertTrue(chunkCount > 0);
-  }
-
-
-  private long addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
-    byte[] fam = Bytes.toBytes("testfamily");
-    byte[] qf = Bytes.toBytes("testqualifier");
-    MemstoreSize memstoreSize = new MemstoreSize();
-    for (int i = 0; i < keys.length; i++) {
-      long timestamp = System.currentTimeMillis();
-      Threads.sleep(1); // to make sure each kv gets a different ts
-      byte[] row = Bytes.toBytes(keys[i]);
-      byte[] val = Bytes.toBytes(keys[i] + i);
-      KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
-      hmc.add(kv, memstoreSize);
-      LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + kv.getTimestamp());
-    }
-    regionServicesForStores.addMemstoreSize(memstoreSize);
-    return memstoreSize.getDataSize();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/95405fbd/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
index 148e110..e40ff8e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
@@ -954,9 +954,12 @@ public class TestDefaultMemStore {
         conf, FSTableDescriptors.createMetaTableDescriptor(conf),
         wFactory.getMetaWAL(HRegionInfo.FIRST_META_REGIONINFO.
             getEncodedNameAsBytes()));
-    HRegionInfo hri = new HRegionInfo(TableName.valueOf(name.getMethodName()),
+    // parameterized tests add [#] suffix get rid of [ and ].
+    HRegionInfo hri =
+        new HRegionInfo(TableName.valueOf(name.getMethodName().replaceAll("[\\[\\]]", "_")),
         Bytes.toBytes("row_0200"), Bytes.toBytes("row_0300"));
-    HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(name.getMethodName()));
+    HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(
+        name.getMethodName().replaceAll("[\\[\\]]", "_")));
     desc.addFamily(new HColumnDescriptor("foo".getBytes()));
     HRegion r =
         HRegion.createHRegion(hri, testDir, conf, desc,


Mime
View raw message