hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From anoopsamj...@apache.org
Subject [3/4] hbase git commit: HBASE-16747 Track memstore data size and heap overhead separately.
Date Sun, 30 Oct 2016 06:51:11 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
index bcaf3a2..b094476 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
@@ -23,7 +23,6 @@ import java.util.List;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
-import org.apache.hadoop.hbase.io.HeapSize;
 
 /**
  * The MemStore holds in-memory modifications to the Store. Modifications are {@link Cell}s.
@@ -33,7 +32,7 @@ import org.apache.hadoop.hbase.io.HeapSize;
  * </p>
  */
 @InterfaceAudience.Private
-public interface MemStore extends HeapSize {
+public interface MemStore {
 
   /**
    * Creates a snapshot of the current memstore. Snapshot must be cleared by call to
@@ -58,27 +57,29 @@ public interface MemStore extends HeapSize {
    *
    * @return size of data that is going to be flushed
    */
-  long getFlushableSize();
+  MemstoreSize getFlushableSize();
 
   /**
    * Return the size of the snapshot(s) if any
    * @return size of the memstore snapshot
    */
-  long getSnapshotSize();
+  MemstoreSize getSnapshotSize();
 
   /**
    * Write an update
    * @param cell
-   * @return approximate size of the passed cell.
+   * @param memstoreSize The delta in memstore size will be passed back via this.
+   *        This will include both data size and heap overhead delta.
    */
-  long add(final Cell cell);
+  void add(final Cell cell, MemstoreSize memstoreSize);
 
   /**
    * Write the updates
    * @param cells
-   * @return approximate size of the passed cell.
+   * @param memstoreSize The delta in memstore size will be passed back via this.
+   *        This will include both data size and heap overhead delta.
    */
-  long add(Iterable<Cell> cells);
+  void add(Iterable<Cell> cells, MemstoreSize memstoreSize);
 
   /**
    * @return Oldest timestamp of all the Cells in the MemStore
@@ -86,30 +87,6 @@ public interface MemStore extends HeapSize {
   long timeOfOldestEdit();
 
   /**
-   * Write a delete
-   * @param deleteCell
-   * @return approximate size of the passed key and value.
-   */
-  long delete(final Cell deleteCell);
-
-  /**
-   * Given the specs of a column, update it, first by inserting a new record,
-   * then removing the old one.  Since there is only 1 KeyValue involved, the memstoreTS
-   * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
-   * store will ensure that the insert/delete each are atomic. A scanner/reader will either
-   * get the new value, or the old value and all readers will eventually only see the new
-   * value after the old was removed.
-   *
-   * @param row
-   * @param family
-   * @param qualifier
-   * @param newValue
-   * @param now
-   * @return Timestamp
-   */
-  long updateColumnValue(byte[] row, byte[] family, byte[] qualifier, long newValue, long now);
-
-  /**
    * Update or insert the specified cells.
    * <p>
    * For each Cell, insert into MemStore. This will atomically upsert the value for that
@@ -122,9 +99,10 @@ public interface MemStore extends HeapSize {
    * only see each KeyValue update as atomic.
    * @param cells
    * @param readpoint readpoint below which we can safely remove duplicate Cells.
-   * @return change in memstore size
+   * @param memstoreSize The delta in memstore size will be passed back via this.
+   *        This will include both data size and heap overhead delta.
    */
-  long upsert(Iterable<Cell> cells, long readpoint);
+  void upsert(Iterable<Cell> cells, long readpoint, MemstoreSize memstoreSize);
 
   /**
    * @return scanner over the memstore. This might include scanner over the snapshot when one is
@@ -133,13 +111,12 @@ public interface MemStore extends HeapSize {
   List<KeyValueScanner> getScanners(long readPt) throws IOException;
 
   /**
-   * @return Total memory occupied by this MemStore. This includes active segment size and heap size
-   *         overhead of this memstore but won't include any size occupied by the snapshot. We
-   *         assume the snapshot will get cleared soon. This is not thread safe and the memstore may
-   *         be changed while computing its size. It is the responsibility of the caller to make
-   *         sure this doesn't happen.
+   * @return Total memory occupied by this MemStore. This won't include any size occupied by the
+   *         snapshot. We assume the snapshot will get cleared soon. This is not thread safe and
+   *         the memstore may be changed while computing its size. It is the responsibility of the
+   *         caller to make sure this doesn't happen.
    */
-  long size();
+  MemstoreSize size();
 
   /**
    * This method is called when it is clear that the flush to disk is completed.

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index 8f78a3b..2f4d225 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -197,9 +197,12 @@ class MemStoreFlusher implements FlushRequester {
            ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) &&
            (bestRegionReplica.getMemstoreSize()
                > secondaryMultiplier * regionToFlush.getMemstoreSize()))) {
-        LOG.info("Refreshing storefiles of region " + bestRegionReplica +
-          " due to global heap pressure. memstore size=" + StringUtils.humanReadableInt(
-            server.getRegionServerAccounting().getGlobalMemstoreSize()));
+        LOG.info("Refreshing storefiles of region " + bestRegionReplica
+            + " due to global heap pressure. Total memstore size="
+            + StringUtils
+                .humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize())
+            + " memstore heap overhead=" + StringUtils.humanReadableInt(
+                server.getRegionServerAccounting().getGlobalMemstoreHeapOverhead()));
         flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica);
         if (!flushedOne) {
           LOG.info("Excluding secondary region " + bestRegionReplica +
@@ -343,16 +346,16 @@ class MemStoreFlusher implements FlushRequester {
    * Return true if global memory usage is above the high watermark
    */
   private boolean isAboveHighWaterMark() {
-    return server.getRegionServerAccounting().
-      getGlobalMemstoreSize() >= globalMemStoreLimit;
+    return server.getRegionServerAccounting().getGlobalMemstoreSize()
+        + server.getRegionServerAccounting().getGlobalMemstoreHeapOverhead() >= globalMemStoreLimit;
   }
 
   /**
    * Return true if we're above the high watermark
    */
   private boolean isAboveLowWaterMark() {
-    return server.getRegionServerAccounting().
-      getGlobalMemstoreSize() >= globalMemStoreLimitLowMark;
+    return server.getRegionServerAccounting().getGlobalMemstoreSize() + server
+        .getRegionServerAccounting().getGlobalMemstoreHeapOverhead() >= globalMemStoreLimitLowMark;
   }
 
   @Override
@@ -586,11 +589,13 @@ class MemStoreFlusher implements FlushRequester {
           while (isAboveHighWaterMark() && !server.isStopped()) {
             if (!blocked) {
               startTime = EnvironmentEdgeManager.currentTime();
-              LOG.info("Blocking updates on "
-                  + server.toString()
-                  + ": the global memstore size "
-                  + TraditionalBinaryPrefix.long2String(server.getRegionServerAccounting()
-                      .getGlobalMemstoreSize(), "", 1) + " is >= than blocking "
+              LOG.info("Blocking updates on " + server.toString() + ": the global memstore size "
+                  + TraditionalBinaryPrefix.long2String(
+                      server.getRegionServerAccounting().getGlobalMemstoreSize(), "", 1)
+                  + " + global memstore heap overhead "
+                  + TraditionalBinaryPrefix.long2String(
+                      server.getRegionServerAccounting().getGlobalMemstoreHeapOverhead(), "", 1)
+                  + " is >= than blocking "
                   + TraditionalBinaryPrefix.long2String(globalMemStoreLimit, "", 1) + " size");
             }
             blocked = true;

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
index 1bb4511..74d1e17 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
@@ -28,7 +28,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 public class MemStoreSnapshot {
   private final long id;
   private final int cellsCount;
-  private final long size;
+  private final long dataSize;
+  private final long heapOverhead;
   private final TimeRangeTracker timeRangeTracker;
   private final KeyValueScanner scanner;
   private final boolean tagsPresent;
@@ -36,7 +37,8 @@ public class MemStoreSnapshot {
   public MemStoreSnapshot(long id, ImmutableSegment snapshot) {
     this.id = id;
     this.cellsCount = snapshot.getCellsCount();
-    this.size = snapshot.keySize();
+    this.dataSize = snapshot.keySize();
+    this.heapOverhead = snapshot.heapOverhead();
     this.timeRangeTracker = snapshot.getTimeRangeTracker();
     this.scanner = snapshot.getKeyValueScanner();
     this.tagsPresent = snapshot.isTagsPresent();
@@ -59,8 +61,12 @@ public class MemStoreSnapshot {
   /**
    * @return Total memory size occupied by this snapshot.
    */
-  public long getSize() {
-    return size;
+  public long getDataSize() {
+    return dataSize;
+  }
+
+  public long getHeapOverhead() {
+    return this.heapOverhead;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreSize.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreSize.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreSize.java
new file mode 100644
index 0000000..77cea51
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemstoreSize.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Wraps the data size part and heap overhead of the memstore.
+ */
+@InterfaceAudience.Private
+public class MemstoreSize {
+
+  static final MemstoreSize EMPTY_SIZE = new MemstoreSize();
+
+  private long dataSize;
+  private long heapOverhead;
+
+  public MemstoreSize() {
+    dataSize = 0;
+    heapOverhead = 0;
+  }
+
+  public MemstoreSize(long dataSize, long heapOverhead) {
+    this.dataSize = dataSize;
+    this.heapOverhead = heapOverhead;
+  }
+
+  public void incMemstoreSize(long dataSize, long heapOverhead) {
+    this.dataSize += dataSize;
+    this.heapOverhead += heapOverhead;
+  }
+
+  public void incMemstoreSize(MemstoreSize size) {
+    this.dataSize += size.dataSize;
+    this.heapOverhead += size.heapOverhead;
+  }
+
+  public void decMemstoreSize(long dataSize, long heapOverhead) {
+    this.dataSize -= dataSize;
+    this.heapOverhead -= heapOverhead;
+  }
+
+  public void decMemstoreSize(MemstoreSize size) {
+    this.dataSize -= size.dataSize;
+    this.heapOverhead -= size.heapOverhead;
+  }
+
+  public long getDataSize() {
+    return dataSize;
+  }
+
+  public long getHeapOverhead() {
+    return heapOverhead;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null || !(obj instanceof MemstoreSize)) {
+      return false;
+    }
+    MemstoreSize other = (MemstoreSize) obj;
+    return this.dataSize == other.dataSize && this.heapOverhead == other.heapOverhead;
+  }
+
+  @Override
+  public int hashCode() {
+    long h = 13 * this.dataSize;
+    h = h + 14 * this.heapOverhead;
+    return (int) h;
+  }
+
+  @Override
+  public String toString() {
+    return "dataSize=" + this.dataSize + " , heapOverhead=" + this.heapOverhead;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
index 3ab1dba..882044c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
@@ -685,7 +685,7 @@ class MetricsRegionServerWrapperImpl
           tempNumStores += storeList.size();
           for (Store store : storeList) {
             tempNumStoreFiles += store.getStorefilesCount();
-            tempMemstoreSize += store.getMemStoreSize();
+            tempMemstoreSize += store.getSizeOfMemStore().getDataSize();
             tempStoreFileSize += store.getStorefilesSize();
 
             long storeMaxStoreFileAge = store.getMaxStoreFileAge();

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
index aed75c2..ecee53f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
@@ -203,7 +203,7 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
       if (region.stores != null) {
         for (Store store : region.stores.values()) {
           tempNumStoreFiles += store.getStorefilesCount();
-          tempMemstoreSize += store.getMemStoreSize();
+          tempMemstoreSize += store.getSizeOfMemStore().getDataSize();
           tempStoreFileSize += store.getStorefilesSize();
 
           long storeMaxStoreFileAge = store.getMaxStoreFileAge();

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
index 3fb9723..cdc11a7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java
@@ -18,8 +18,14 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.util.Iterator;
+import java.util.SortedSet;
+
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -42,10 +48,60 @@ public class MutableSegment extends Segment {
    * Adds the given cell into the segment
    * @param cell the cell to add
    * @param mslabUsed whether using MSLAB
-   * @return the change in the heap size
+   * @param memstoreSize
    */
-  public long add(Cell cell, boolean mslabUsed) {
-    return internalAdd(cell, mslabUsed);
+  public void add(Cell cell, boolean mslabUsed, MemstoreSize memstoreSize) {
+    internalAdd(cell, mslabUsed, memstoreSize);
+  }
+
+  public void upsert(Cell cell, long readpoint, MemstoreSize memstoreSize) {
+    internalAdd(cell, false, memstoreSize);
+
+    // Get the Cells for the row/family/qualifier regardless of timestamp.
+    // For this case we want to clean up any other puts
+    Cell firstCell = KeyValueUtil.createFirstOnRow(cell.getRowArray(), cell.getRowOffset(),
+        cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+        cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+    SortedSet<Cell> ss = this.tailSet(firstCell);
+    Iterator<Cell> it = ss.iterator();
+    // versions visible to oldest scanner
+    int versionsVisible = 0;
+    while (it.hasNext()) {
+      Cell cur = it.next();
+
+      if (cell == cur) {
+        // ignore the one just put in
+        continue;
+      }
+      // check that this is the row and column we are interested in, otherwise bail
+      if (CellUtil.matchingRows(cell, cur) && CellUtil.matchingQualifier(cell, cur)) {
+        // only remove Puts that concurrent scanners cannot possibly see
+        if (cur.getTypeByte() == KeyValue.Type.Put.getCode() && cur.getSequenceId() <= readpoint) {
+          if (versionsVisible >= 1) {
+            // if we get here we have seen at least one version visible to the oldest scanner,
+            // which means we can prove that no scanner will see this version
+
+            // false means there was a change, so give us the size.
+            // TODO when the removed cell ie.'cur' having its data in MSLAB, we can not release that
+            // area. Only the Cell object as such going way. We need to consider cellLen to be
+            // decreased there as 0 only. Just keeping it as existing code now. We need to know the
+            // removed cell is from MSLAB or not. Will do once HBASE-16438 is in
+            int cellLen = getCellLength(cur);
+            long heapOverheadDelta = heapOverheadChange(cur, true);
+            this.incSize(-cellLen, -heapOverheadDelta);
+            if (memstoreSize != null) {
+              memstoreSize.decMemstoreSize(cellLen, heapOverheadDelta);
+            }
+            it.remove();
+          } else {
+            versionsVisible++;
+          }
+        }
+      } else {
+        // past the row or column, done
+        break;
+      }
+    }
   }
 
   /**
@@ -67,9 +123,4 @@ public class MutableSegment extends Segment {
   public long getMinTimestamp() {
     return this.timeRangeTracker.getMin();
   }
-
-  @Override
-  public long size() {
-    return keySize() + DEEP_OVERHEAD;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index 3a5acfe..1b106b2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -195,7 +195,11 @@ public interface Region extends ConfigurationObserver {
    */
   void updateWriteRequestsCount(long i);
 
-  /** @return memstore size for this region, in bytes */
+  /**
+   * @return memstore size for this region, in bytes. It just accounts data size of cells added to
+   *         the memstores of this Region. Means size in bytes for key, value and tags within Cells.
+   *         It wont consider any java heap overhead for the cell objects or any other.
+   */
   long getMemstoreSize();
 
   /** @return store services for this region, to access services required by store level needs */

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
index 879b573..cb8551f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java
@@ -32,43 +32,57 @@ import org.apache.hadoop.hbase.util.Bytes;
 @InterfaceAudience.Private
 public class RegionServerAccounting {
 
-  private final AtomicLong atomicGlobalMemstoreSize = new AtomicLong(0);
-  
+  private final AtomicLong globalMemstoreDataSize = new AtomicLong(0);
+  private final AtomicLong globalMemstoreHeapOverhead = new AtomicLong(0);
+
   // Store the edits size during replaying WAL. Use this to roll back the  
   // global memstore size once a region opening failed.
-  private final ConcurrentMap<byte[], AtomicLong> replayEditsPerRegion = 
-    new ConcurrentSkipListMap<byte[], AtomicLong>(Bytes.BYTES_COMPARATOR);
+  private final ConcurrentMap<byte[], MemstoreSize> replayEditsPerRegion =
+    new ConcurrentSkipListMap<byte[], MemstoreSize>(Bytes.BYTES_COMPARATOR);
 
   /**
    * @return the global Memstore size in the RegionServer
    */
   public long getGlobalMemstoreSize() {
-    return atomicGlobalMemstoreSize.get();
+    return globalMemstoreDataSize.get();
+  }
+
+  public long getGlobalMemstoreHeapOverhead() {
+    return this.globalMemstoreHeapOverhead.get();
   }
-  
+
   /**
    * @param memStoreSize the Memstore size will be added to 
    *        the global Memstore size 
-   * @return the global Memstore size in the RegionServer 
    */
-  public long addAndGetGlobalMemstoreSize(long memStoreSize) {
-    return atomicGlobalMemstoreSize.addAndGet(memStoreSize);
+  public void incGlobalMemstoreSize(MemstoreSize memStoreSize) {
+    globalMemstoreDataSize.addAndGet(memStoreSize.getDataSize());
+    globalMemstoreHeapOverhead.addAndGet(memStoreSize.getHeapOverhead());
   }
-  
+
+  public void decGlobalMemstoreSize(MemstoreSize memStoreSize) {
+    globalMemstoreDataSize.addAndGet(-memStoreSize.getDataSize());
+    globalMemstoreHeapOverhead.addAndGet(-memStoreSize.getHeapOverhead());
+  }
+
   /***
    * Add memStoreSize to replayEditsPerRegion.
    * 
    * @param regionName region name.
    * @param memStoreSize the Memstore size will be added to replayEditsPerRegion.
-   * @return the replay edits size for the region.
    */
-  public long addAndGetRegionReplayEditsSize(byte[] regionName, long memStoreSize) {
-    AtomicLong replayEdistsSize = replayEditsPerRegion.get(regionName);
+  public void addRegionReplayEditsSize(byte[] regionName, MemstoreSize memStoreSize) {
+    MemstoreSize replayEdistsSize = replayEditsPerRegion.get(regionName);
+    // All ops on the same MemstoreSize object is going to be done by single thread, sequentially
+    // only. First calls to this method to increment the per region reply edits size and then call
+    // to either rollbackRegionReplayEditsSize or clearRegionReplayEditsSize as per the result of
+    // the region open operation. No need to handle multi thread issues on one region's entry in
+    // this Map.
     if (replayEdistsSize == null) {
-      replayEdistsSize = new AtomicLong(0);
+      replayEdistsSize = new MemstoreSize();
       replayEditsPerRegion.put(regionName, replayEdistsSize);
     }
-    return replayEdistsSize.addAndGet(memStoreSize);
+    replayEdistsSize.incMemstoreSize(memStoreSize);
   }
 
   /**
@@ -76,16 +90,13 @@ public class RegionServerAccounting {
    * can't be opened.
    * 
    * @param regionName the region which could not open.
-   * @return the global Memstore size in the RegionServer
    */
-  public long rollbackRegionReplayEditsSize(byte[] regionName) {
-    AtomicLong replayEditsSize = replayEditsPerRegion.get(regionName);
-    long editsSizeLong = 0L;
+  public void rollbackRegionReplayEditsSize(byte[] regionName) {
+    MemstoreSize replayEditsSize = replayEditsPerRegion.get(regionName);
     if (replayEditsSize != null) {
-      editsSizeLong = -replayEditsSize.get();
       clearRegionReplayEditsSize(regionName);
+      decGlobalMemstoreSize(replayEditsSize);
     }
-    return addAndGetGlobalMemstoreSize(editsSizeLong);
   }
 
   /**
@@ -96,5 +107,4 @@ public class RegionServerAccounting {
   public void clearRegionReplayEditsSize(byte[] regionName) {
     replayEditsPerRegion.remove(regionName);
   }
-  
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
index e481a63..c7f2ce6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
@@ -25,8 +25,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.util.StealJobQueue;
 import org.apache.hadoop.hbase.wal.WAL;
 
 /**
@@ -37,7 +35,6 @@ import org.apache.hadoop.hbase.wal.WAL;
  * take occasional lock and update size counters at the region level.
  */
 @InterfaceAudience.Private
-@InterfaceStability.Evolving
 public class RegionServicesForStores {
 
   private static final int POOL_SIZE = 10;
@@ -68,8 +65,8 @@ public class RegionServicesForStores {
     region.unblockUpdates();
   }
 
-  public long addAndGetGlobalMemstoreSize(long size) {
-    return region.addAndGetGlobalMemstoreSize(size);
+  public void addMemstoreSize(MemstoreSize size) {
+    region.addAndGetMemstoreSize(size);
   }
 
   public HRegionInfo getRegionInfo() {
@@ -91,7 +88,7 @@ public class RegionServicesForStores {
   }
 
   // methods for tests
-  long getGlobalMemstoreTotalSize() {
+  long getMemstoreSize() {
     return region.getMemstoreSize();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
index 864e256..afdfe6f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java
@@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.ExtendedCell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -55,11 +55,12 @@ public abstract class Segment {
 
   private AtomicReference<CellSet> cellSet= new AtomicReference<CellSet>();
   private final CellComparator comparator;
-  private long minSequenceId;
+  protected long minSequenceId;
   private MemStoreLAB memStoreLAB;
   // Sum of sizes of all Cells added to this Segment. Cell's heapSize is considered. This is not
   // including the heap overhead of this class.
-  protected final AtomicLong size;
+  protected final AtomicLong dataSize;
+  protected final AtomicLong heapOverhead;
   protected final TimeRangeTracker timeRangeTracker;
   protected volatile boolean tagsPresent;
 
@@ -69,7 +70,8 @@ public abstract class Segment {
     this.comparator = comparator;
     this.minSequenceId = Long.MAX_VALUE;
     this.memStoreLAB = memStoreLAB;
-    this.size = new AtomicLong(0);
+    this.dataSize = new AtomicLong(0);
+    this.heapOverhead = new AtomicLong(0);
     this.tagsPresent = false;
     this.timeRangeTracker = new TimeRangeTracker();
   }
@@ -79,7 +81,8 @@ public abstract class Segment {
     this.comparator = segment.getComparator();
     this.minSequenceId = segment.getMinSequenceId();
     this.memStoreLAB = segment.getMemStoreLAB();
-    this.size = new AtomicLong(segment.keySize());
+    this.dataSize = new AtomicLong(segment.keySize());
+    this.heapOverhead = new AtomicLong(segment.heapOverhead.get());
     this.tagsPresent = segment.isTagsPresent();
     this.timeRangeTracker = segment.getTimeRangeTracker();
   }
@@ -154,7 +157,7 @@ public abstract class Segment {
    * Get cell length after serialized in {@link KeyValue}
    */
   @VisibleForTesting
-  int getCellLength(Cell cell) {
+  static int getCellLength(Cell cell) {
     return KeyValueUtil.length(cell);
   }
 
@@ -193,19 +196,26 @@ public abstract class Segment {
    * @return Sum of all cell's size.
    */
   public long keySize() {
-    return this.size.get();
+    return this.dataSize.get();
   }
 
   /**
-   * @return the heap size of the segment
+   * @return The heap overhead of this segment.
    */
-  public abstract long size();
+  public long heapOverhead() {
+    return this.heapOverhead.get();
+  }
 
   /**
    * Updates the heap size counter of the segment by the given delta
    */
-  public void incSize(long delta) {
-    this.size.addAndGet(delta);
+  protected void incSize(long delta, long heapOverhead) {
+    this.dataSize.addAndGet(delta);
+    this.heapOverhead.addAndGet(heapOverhead);
+  }
+
+  protected void incHeapOverheadSize(long delta) {
+    this.heapOverhead.addAndGet(delta);
   }
 
   public long getMinSequenceId() {
@@ -252,36 +262,47 @@ public abstract class Segment {
     return comparator;
   }
 
-  protected long internalAdd(Cell cell, boolean mslabUsed) {
+  protected void internalAdd(Cell cell, boolean mslabUsed, MemstoreSize memstoreSize) {
     boolean succ = getCellSet().add(cell);
-    long s = updateMetaInfo(cell, succ, mslabUsed);
-    return s;
+    updateMetaInfo(cell, succ, mslabUsed, memstoreSize);
   }
 
-  protected long updateMetaInfo(Cell cellToAdd, boolean succ, boolean mslabUsed) {
-    long s = heapSizeChange(cellToAdd, succ);
+  protected void updateMetaInfo(Cell cellToAdd, boolean succ, boolean mslabUsed,
+      MemstoreSize memstoreSize) {
+    long cellSize = 0;
     // If there's already a same cell in the CellSet and we are using MSLAB, we must count in the
     // MSLAB allocation size as well, or else there will be memory leak (occupied heap size larger
     // than the counted number)
-    if (!succ && mslabUsed) {
-      s += getCellLength(cellToAdd);
+    if (succ || mslabUsed) {
+      cellSize = getCellLength(cellToAdd);
+    }
+    long overhead = heapOverheadChange(cellToAdd, succ);
+    incSize(cellSize, overhead);
+    if (memstoreSize != null) {
+      memstoreSize.incMemstoreSize(cellSize, overhead);
     }
     getTimeRangeTracker().includeTimestamp(cellToAdd);
-    incSize(s);
     minSequenceId = Math.min(minSequenceId, cellToAdd.getSequenceId());
     // In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call.
     // When we use ACL CP or Visibility CP which deals with Tags during
     // mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not
     // parse the byte[] to identify the tags length.
-    if( cellToAdd.getTagsLength() > 0) {
+    if (cellToAdd.getTagsLength() > 0) {
       tagsPresent = true;
     }
-    return s;
   }
 
-  protected long heapSizeChange(Cell cell, boolean succ) {
-    return succ ? ClassSize
-        .align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + CellUtil.estimatedHeapSizeOf(cell)) : 0;
+  protected long heapOverheadChange(Cell cell, boolean succ) {
+    if (succ) {
+      if (cell instanceof ExtendedCell) {
+        return ClassSize
+            .align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ((ExtendedCell) cell).heapOverhead());
+      }
+      // All cells in server side will be of type ExtendedCell. If not just go with estimation on
+      // the heap overhead considering it is KeyValue.
+      return ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD);
+    }
+    return 0;
   }
 
   /**
@@ -314,7 +335,7 @@ public abstract class Segment {
     res += "isEmpty "+(isEmpty()?"yes":"no")+"; ";
     res += "cellCount "+getCellsCount()+"; ";
     res += "cellsSize "+keySize()+"; ";
-    res += "heapSize "+size()+"; ";
+    res += "heapOverhead "+heapOverhead()+"; ";
     res += "Min ts "+getMinTimestamp()+"; ";
     return res;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index ef6e400..30e6a74 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -255,22 +255,45 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
 
   /**
    * @return The size of this store's memstore, in bytes
+   * @deprecated Since 2.0 and will be removed in 3.0. Use {@link #getSizeOfMemStore()} instead.
    */
+  @Deprecated
   long getMemStoreSize();
 
   /**
+   * @return The size of this store's memstore.
+   */
+  MemstoreSize getSizeOfMemStore();
+
+  /**
    * @return The amount of memory we could flush from this memstore; usually this is equal to
    * {@link #getMemStoreSize()} unless we are carrying snapshots and then it will be the size of
    * outstanding snapshots.
+   * @deprecated Since 2.0 and will be removed in 3.0. Use {@link #getSizeToFlush()} instead.
    */
+  @Deprecated
   long getFlushableSize();
 
   /**
+   * @return The amount of memory we could flush from this memstore; usually this is equal to
+   * {@link #getSizeOfMemStore()} unless we are carrying snapshots and then it will be the size of
+   * outstanding snapshots.
+   */
+  MemstoreSize getSizeToFlush();
+
+  /**
    * Returns the memstore snapshot size
    * @return size of the memstore snapshot
+   * @deprecated Since 2.0 and will be removed in 3.0. Use {@link #getSizeOfSnapshot()} instead.
    */
+  @Deprecated
   long getSnapshotSize();
 
+  /**
+   * @return size of the memstore snapshot
+   */
+  MemstoreSize getSizeOfSnapshot();
+
   HColumnDescriptor getFamily();
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
index f4fd603..53488ec 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.MemstoreSize;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -61,7 +62,7 @@ public class TestClientPushback {
   private static final TableName tableName = TableName.valueOf("client-pushback");
   private static final byte[] family = Bytes.toBytes("f");
   private static final byte[] qualifier = Bytes.toBytes("q");
-  private static final long flushSizeBytes = 1024;
+  private static final long flushSizeBytes = 256;
 
   @BeforeClass
   public static void setupCluster() throws Exception{
@@ -103,7 +104,8 @@ public class TestClientPushback {
     table.put(p);
 
     // get the current load on RS. Hopefully memstore isn't flushed since we wrote the the data
-    int load = (int)((((HRegion)region).addAndGetGlobalMemstoreSize(0) * 100) / flushSizeBytes);
+    int load = (int) ((((HRegion) region).addAndGetMemstoreSize(new MemstoreSize(0, 0)) * 100)
+        / flushSizeBytes);
     LOG.debug("Done writing some data to "+tableName);
 
     // get the stats for the region hosting our table

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
index 84efb09..7dd9479 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
@@ -19,8 +19,6 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -40,6 +38,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.EnvironmentEdge;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Threads;
@@ -121,8 +120,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     final KeyValue kv2 = new KeyValue(two, f, q, 10, v);
 
     // use case 1: both kvs in kvset
-    this.memstore.add(kv1.clone());
-    this.memstore.add(kv2.clone());
+    this.memstore.add(kv1.clone(), null);
+    this.memstore.add(kv2.clone(), null);
     verifyScanAcrossSnapshot2(kv1, kv2);
 
     // use case 2: both kvs in snapshot
@@ -132,12 +131,12 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     // use case 3: first in snapshot second in kvset
     this.memstore = new CompactingMemStore(HBaseConfiguration.create(),
         CellComparator.COMPARATOR, store, regionServicesForStores);
-    this.memstore.add(kv1.clone());
+    this.memstore.add(kv1.clone(), null);
     // As compaction is starting in the background the repetition
     // of the k1 might be removed BUT the scanners created earlier
     // should look on the OLD MutableCellSetSegment, so this should be OK...
     this.memstore.snapshot();
-    this.memstore.add(kv2.clone());
+    this.memstore.add(kv2.clone(), null);
     verifyScanAcrossSnapshot2(kv1,kv2);
   }
 
@@ -173,7 +172,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     Thread.sleep(1);
     addRows(this.memstore);
     Cell closestToEmpty = ((CompactingMemStore)this.memstore).getNextRow(KeyValue.LOWESTKEY);
-    assertTrue(KeyValue.COMPARATOR.compareRows(closestToEmpty,
+    assertTrue(CellComparator.COMPARATOR.compareRows(closestToEmpty,
         new KeyValue(Bytes.toBytes(0), System.currentTimeMillis())) == 0);
     for (int i = 0; i < ROW_COUNT; i++) {
       Cell nr = ((CompactingMemStore)this.memstore).getNextRow(new KeyValue(Bytes.toBytes(i),
@@ -181,7 +180,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
       if (i + 1 == ROW_COUNT) {
         assertEquals(nr, null);
       } else {
-        assertTrue(KeyValue.COMPARATOR.compareRows(nr,
+        assertTrue(CellComparator.COMPARATOR.compareRows(nr,
             new KeyValue(Bytes.toBytes(i + 1), System.currentTimeMillis())) == 0);
       }
     }
@@ -226,9 +225,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     byte[] val = Bytes.toBytes("testval");
 
     //Setting up memstore
-    memstore.add(new KeyValue(row, fam, qf1, val));
-    memstore.add(new KeyValue(row, fam, qf2, val));
-    memstore.add(new KeyValue(row, fam, qf3, val));
+    memstore.add(new KeyValue(row, fam, qf1, val), null);
+    memstore.add(new KeyValue(row, fam, qf2, val), null);
+    memstore.add(new KeyValue(row, fam, qf3, val), null);
     //Pushing to pipeline
     ((CompactingMemStore)memstore).flushInMemory();
     assertEquals(0, memstore.getSnapshot().getCellsCount());
@@ -237,57 +236,11 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     assertEquals(3, memstore.getSnapshot().getCellsCount());
     //Adding value to "new" memstore
     assertEquals(0, memstore.getActive().getCellsCount());
-    memstore.add(new KeyValue(row, fam, qf4, val));
-    memstore.add(new KeyValue(row, fam, qf5, val));
+    memstore.add(new KeyValue(row, fam, qf4, val), null);
+    memstore.add(new KeyValue(row, fam, qf5, val), null);
     assertEquals(2, memstore.getActive().getCellsCount());
   }
 
-
-  ////////////////////////////////////
-  //Test for upsert with MSLAB
-  ////////////////////////////////////
-
-  /**
-   * Test a pathological pattern that shows why we can't currently
-   * use the MSLAB for upsert workloads. This test inserts data
-   * in the following pattern:
-   *
-   * - row0001 through row1000 (fills up one 2M Chunk)
-   * - row0002 through row1001 (fills up another 2M chunk, leaves one reference
-   *   to the first chunk
-   * - row0003 through row1002 (another chunk, another dangling reference)
-   *
-   * This causes OOME pretty quickly if we use MSLAB for upsert
-   * since each 2M chunk is held onto by a single reference.
-   */
-  @Override
-  @Test
-  public void testUpsertMSLAB() throws Exception {
-
-    int ROW_SIZE = 2048;
-    byte[] qualifier = new byte[ROW_SIZE - 4];
-
-    MemoryMXBean bean = ManagementFactory.getMemoryMXBean();
-    for (int i = 0; i < 3; i++) { System.gc(); }
-    long usageBefore = bean.getHeapMemoryUsage().getUsed();
-
-    long size = 0;
-    long ts=0;
-
-    for (int newValue = 0; newValue < 1000; newValue++) {
-      for (int row = newValue; row < newValue + 1000; row++) {
-        byte[] rowBytes = Bytes.toBytes(row);
-        size += memstore.updateColumnValue(rowBytes, FAMILY, qualifier, newValue, ++ts);
-      }
-    }
-    System.out.println("Wrote " + ts + " vals");
-    for (int i = 0; i < 3; i++) { System.gc(); }
-    long usageAfter = bean.getHeapMemoryUsage().getUsed();
-    System.out.println("Memory used: " + (usageAfter - usageBefore)
-        + " (heapsize: " + memstore.heapSize() +
-        " size: " + size + ")");
-  }
-
   ////////////////////////////////////
   // Test for periodic memstore flushes
   // based on time of oldest edit
@@ -302,7 +255,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
   @Override
   @Test
   public void testUpsertMemstoreSize() throws Exception {
-    long oldSize = memstore.size();
+    MemstoreSize oldSize = memstore.size();
 
     List<Cell> l = new ArrayList<Cell>();
     KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
@@ -316,9 +269,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     l.add(kv2);
     l.add(kv3);
 
-    this.memstore.upsert(l, 2);// readpoint is 2
-    long newSize = this.memstore.size();
-    assert (newSize > oldSize);
+    this.memstore.upsert(l, 2, null);// readpoint is 2
+    MemstoreSize newSize = this.memstore.size();
+    assert (newSize.getDataSize() > oldSize.getDataSize());
     //The kv1 should be removed.
     assert (memstore.getActive().getCellsCount() == 2);
 
@@ -326,7 +279,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     kv4.setSequenceId(1);
     l.clear();
     l.add(kv4);
-    this.memstore.upsert(l, 3);
+    this.memstore.upsert(l, 3, null);
     assertEquals(newSize, this.memstore.size());
     //The kv2 should be removed.
     assert (memstore.getActive().getCellsCount() == 2);
@@ -348,7 +301,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
       assertEquals(t, Long.MAX_VALUE);
 
       // test the case that the timeOfOldestEdit is updated after a KV add
-      memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"));
+      memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"), null);
       t = memstore.timeOfOldestEdit();
       assertTrue(t == 1234);
       // The method will also assert
@@ -356,7 +309,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
       t = runSnapshot(memstore, true);
 
       // test the case that the timeOfOldestEdit is updated after a KV delete
-      memstore.delete(KeyValueTestUtil.create("r", "f", "q", 100, "v"));
+      memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, KeyValue.Type.Delete, "v"), null);
       t = memstore.timeOfOldestEdit();
       assertTrue(t == 1234);
      t = runSnapshot(memstore, true);
@@ -366,7 +319,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
       KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
       kv1.setSequenceId(100);
       l.add(kv1);
-      memstore.upsert(l, 1000);
+      memstore.upsert(l, 1000, null);
       t = memstore.timeOfOldestEdit();
       assertTrue(t == 1234);
     } finally {
@@ -384,7 +337,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     MemStoreSnapshot snapshot = hmc.snapshot();
     if (useForce) {
       // Make some assertions about what just happened.
-      assertTrue("History size has not increased", oldHistorySize < snapshot.getSize());
+      assertTrue("History size has not increased", oldHistorySize < snapshot.getDataSize());
       long t = hmc.timeOfOldestEdit();
       assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE);
       hmc.clearSnapshot(snapshot.getId());
@@ -421,9 +374,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     byte[] val = Bytes.toBytes("testval");
 
     // Setting up memstore
-    memstore.add(new KeyValue(row, fam, qf1, val));
-    memstore.add(new KeyValue(row, fam, qf2, val));
-    memstore.add(new KeyValue(row, fam, qf3, val));
+    memstore.add(new KeyValue(row, fam, qf1, val), null);
+    memstore.add(new KeyValue(row, fam, qf2, val), null);
+    memstore.add(new KeyValue(row, fam, qf3, val), null);
 
     // Creating a snapshot
     MemStoreSnapshot snapshot = memstore.snapshot();
@@ -431,8 +384,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
 
     // Adding value to "new" memstore
     assertEquals(0, memstore.getActive().getCellsCount());
-    memstore.add(new KeyValue(row, fam, qf4, val));
-    memstore.add(new KeyValue(row, fam, qf5, val));
+    memstore.add(new KeyValue(row, fam, qf4, val), null);
+    memstore.add(new KeyValue(row, fam, qf5, val), null);
     assertEquals(2, memstore.getActive().getCellsCount());
     memstore.clearSnapshot(snapshot.getId());
 
@@ -456,9 +409,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     byte[] val = Bytes.toBytes("testval");
 
     // Setting up memstore
-    memstore.add(new KeyValue(row, fam, qf1, val));
-    memstore.add(new KeyValue(row, fam, qf2, val));
-    memstore.add(new KeyValue(row, fam, qf3, val));
+    memstore.add(new KeyValue(row, fam, qf1, val), null);
+    memstore.add(new KeyValue(row, fam, qf2, val), null);
+    memstore.add(new KeyValue(row, fam, qf3, val), null);
 
     // Creating a snapshot
     MemStoreSnapshot snapshot = memstore.snapshot();
@@ -466,8 +419,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
 
     // Adding value to "new" memstore
     assertEquals(0, memstore.getActive().getCellsCount());
-    memstore.add(new KeyValue(row, fam, qf4, val));
-    memstore.add(new KeyValue(row, fam, qf5, val));
+    memstore.add(new KeyValue(row, fam, qf4, val), null);
+    memstore.add(new KeyValue(row, fam, qf5, val), null);
     assertEquals(2, memstore.getActive().getCellsCount());
 
     // opening scanner before clear the snapshot
@@ -491,8 +444,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
 
     snapshot = memstore.snapshot();
     // Adding more value
-    memstore.add(new KeyValue(row, fam, qf6, val));
-    memstore.add(new KeyValue(row, fam, qf7, val));
+    memstore.add(new KeyValue(row, fam, qf6, val), null);
+    memstore.add(new KeyValue(row, fam, qf7, val), null);
     // opening scanners
     scanners = memstore.getScanners(0);
     // close scanners before clear the snapshot
@@ -521,9 +474,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     byte[] val = Bytes.toBytes("testval");
 
     // Setting up memstore
-    memstore.add(new KeyValue(row, fam, qf1, 1, val));
-    memstore.add(new KeyValue(row, fam, qf2, 1, val));
-    memstore.add(new KeyValue(row, fam, qf3, 1, val));
+    memstore.add(new KeyValue(row, fam, qf1, 1, val), null);
+    memstore.add(new KeyValue(row, fam, qf2, 1, val), null);
+    memstore.add(new KeyValue(row, fam, qf3, 1, val), null);
 
     // Creating a pipeline
     ((CompactingMemStore)memstore).disableCompaction();
@@ -531,8 +484,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
 
     // Adding value to "new" memstore
     assertEquals(0, memstore.getActive().getCellsCount());
-    memstore.add(new KeyValue(row, fam, qf1, 2, val));
-    memstore.add(new KeyValue(row, fam, qf2, 2, val));
+    memstore.add(new KeyValue(row, fam, qf1, 2, val), null);
+    memstore.add(new KeyValue(row, fam, qf2, 2, val), null);
     assertEquals(2, memstore.getActive().getCellsCount());
 
     // pipeline bucket 2
@@ -547,9 +500,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
 
     // Adding value to "new" memstore
     assertEquals(0, memstore.getActive().getCellsCount());
-    memstore.add(new KeyValue(row, fam, qf3, 3, val));
-    memstore.add(new KeyValue(row, fam, qf2, 3, val));
-    memstore.add(new KeyValue(row, fam, qf1, 3, val));
+    memstore.add(new KeyValue(row, fam, qf3, 3, val), null);
+    memstore.add(new KeyValue(row, fam, qf2, 3, val), null);
+    memstore.add(new KeyValue(row, fam, qf1, 3, val), null);
     assertEquals(3, memstore.getActive().getCellsCount());
 
     assertTrue(chunkPool.getPoolSize() == 0);
@@ -570,8 +523,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
 
     snapshot = memstore.snapshot();
     // Adding more value
-    memstore.add(new KeyValue(row, fam, qf2, 4, val));
-    memstore.add(new KeyValue(row, fam, qf3, 4, val));
+    memstore.add(new KeyValue(row, fam, qf2, 4, val), null);
+    memstore.add(new KeyValue(row, fam, qf3, 4, val), null);
     // opening scanners
     scanners = memstore.getScanners(0);
     // close scanners before clear the snapshot
@@ -597,20 +550,27 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4
 
     // test 1 bucket
-    addRowsByKeys(memstore, keys1);
-    assertEquals(496, regionServicesForStores.getGlobalMemstoreTotalSize());
+    int totalCellsLen = addRowsByKeys(memstore, keys1);
+    long totalHeapOverhead = 4 * (KeyValue.FIXED_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
+    assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapOverhead, ((CompactingMemStore)memstore).heapOverhead());
 
-    long size = memstore.getFlushableSize();
+    MemstoreSize size = memstore.getFlushableSize();
     ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize());
+    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
+    // totalCellsLen
+    totalCellsLen = (totalCellsLen * 3) / 4;
+    totalHeapOverhead = 3 * (KeyValue.FIXED_OVERHEAD + ClassSize.CELL_ARRAY_MAP_ENTRY);
+    assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapOverhead, ((CompactingMemStore)memstore).heapOverhead());
 
     size = memstore.getFlushableSize();
     MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
-    region.addAndGetGlobalMemstoreSize(-size);  // simulate flusher
+    region.decrMemstoreSize(size);  // simulate flusher
     ImmutableSegment s = memstore.getSnapshot();
     assertEquals(3, s.getCellsCount());
-    assertEquals(0, regionServicesForStores.getGlobalMemstoreTotalSize());
+    assertEquals(0, regionServicesForStores.getMemstoreSize());
 
     memstore.clearSnapshot(snapshot.getId());
   }
@@ -624,11 +584,13 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     String[] keys1 = { "A", "A", "B", "C" };
     String[] keys2 = { "A", "B", "D" };
 
-    addRowsByKeys(memstore, keys1);
+    int totalCellsLen1 = addRowsByKeys(memstore, keys1);
+    long totalHeapOverhead = 4 * (KeyValue.FIXED_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
 
-    assertEquals(496, regionServicesForStores.getGlobalMemstoreTotalSize());
+    assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapOverhead, ((CompactingMemStore)memstore).heapOverhead());
 
-    long size = memstore.getFlushableSize();
+    MemstoreSize size = memstore.getFlushableSize();
     ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
     int counter = 0;
     for ( Segment s : memstore.getSegments()) {
@@ -636,22 +598,32 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     }
     assertEquals(3, counter);
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize());
-
-    addRowsByKeys(memstore, keys2);
-    assertEquals(640, regionServicesForStores.getGlobalMemstoreTotalSize());
+    // One cell is duplicated and the compaction will remove it. All cells of same time so adjusting
+    // totalCellsLen
+    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
+    assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
+    totalHeapOverhead = 3 * (KeyValue.FIXED_OVERHEAD + ClassSize.CELL_ARRAY_MAP_ENTRY);
+    assertEquals(totalHeapOverhead, ((CompactingMemStore)memstore).heapOverhead());
+
+    int totalCellsLen2 = addRowsByKeys(memstore, keys2);
+    totalHeapOverhead += 3 * (KeyValue.FIXED_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
+    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapOverhead, ((CompactingMemStore)memstore).heapOverhead());
 
     size = memstore.getFlushableSize();
     ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    assertEquals(368, regionServicesForStores.getGlobalMemstoreTotalSize());
+    totalCellsLen2 = totalCellsLen2 / 3;// 2 cells duplicated in set 2
+    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
+    totalHeapOverhead = 4 * (KeyValue.FIXED_OVERHEAD + ClassSize.CELL_ARRAY_MAP_ENTRY);
+    assertEquals(totalHeapOverhead, ((CompactingMemStore)memstore).heapOverhead());
 
     size = memstore.getFlushableSize();
     MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
-    region.addAndGetGlobalMemstoreSize(-size);  // simulate flusher
+    region.decrMemstoreSize(size);  // simulate flusher
     ImmutableSegment s = memstore.getSnapshot();
     assertEquals(4, s.getCellsCount());
-    assertEquals(0, regionServicesForStores.getGlobalMemstoreTotalSize());
+    assertEquals(0, regionServicesForStores.getMemstoreSize());
 
     memstore.clearSnapshot(snapshot.getId());
   }
@@ -666,33 +638,47 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
     String[] keys2 = { "A", "B", "D" };
     String[] keys3 = { "D", "B", "B" };
 
-    addRowsByKeys(memstore, keys1);
-    assertEquals(496, region.getMemstoreSize());
+    int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 4 cells.
+    assertEquals(totalCellsLen1, region.getMemstoreSize());
+    long totalHeapOverhead = 4 * (KeyValue.FIXED_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
+    assertEquals(totalHeapOverhead, ((CompactingMemStore)memstore).heapOverhead());
 
-    long size = memstore.getFlushableSize();
+    MemstoreSize size = memstore.getFlushableSize();
     ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and compact
 
-    String tstStr = "\n\nFlushable size after first flush in memory:" + size
-        + ". Is MemmStore in compaction?:" + ((CompactingMemStore)memstore).isMemStoreFlushingInMemory();
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize());
-
-    addRowsByKeys(memstore, keys2);
-
-    tstStr += " After adding second part of the keys. Memstore size: " +
-        region.getMemstoreSize() + ", Memstore Total Size: " +
-        regionServicesForStores.getGlobalMemstoreTotalSize() + "\n\n";
-
-    assertEquals(640, regionServicesForStores.getGlobalMemstoreTotalSize());
-
-    ((CompactingMemStore)memstore).disableCompaction();
+    // One cell is duplicated and the compaction will remove it. All cells of same time so adjusting
+    // totalCellsLen
+    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
+    assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
+    // In memory flush to make a CellArrayMap instead of CSLM. See the overhead diff.
+    totalHeapOverhead = 3 * (KeyValue.FIXED_OVERHEAD + ClassSize.CELL_ARRAY_MAP_ENTRY);
+    assertEquals(totalHeapOverhead, ((CompactingMemStore)memstore).heapOverhead());
+
+    int totalCellsLen2 = addRowsByKeys(memstore, keys2);// Adding 3 more cells.
+    long totalHeapOverhead2 = 3
+        * (KeyValue.FIXED_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
+
+    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapOverhead + totalHeapOverhead2,
+        ((CompactingMemStore) memstore).heapOverhead());
+
+    ((CompactingMemStore) memstore).disableCompaction();
     size = memstore.getFlushableSize();
     ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline without compaction
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    assertEquals(640, regionServicesForStores.getGlobalMemstoreTotalSize());
-
-    addRowsByKeys(memstore, keys3);
-    assertEquals(1016, regionServicesForStores.getGlobalMemstoreTotalSize());
+    // No change in the cells data size. ie. memstore size. as there is no compaction.
+    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapOverhead + totalHeapOverhead2,
+        ((CompactingMemStore) memstore).heapOverhead());
+
+    int totalCellsLen3 = addRowsByKeys(memstore, keys3);// 3 more cells added
+    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
+        regionServicesForStores.getMemstoreSize());
+    long totalHeapOverhead3 = 3
+        * (KeyValue.FIXED_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
+    assertEquals(totalHeapOverhead + totalHeapOverhead2 + totalHeapOverhead3,
+        ((CompactingMemStore) memstore).heapOverhead());
 
     ((CompactingMemStore)memstore).enableCompaction();
     size = memstore.getFlushableSize();
@@ -701,34 +687,47 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
       Threads.sleep(10);
     }
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    assertEquals(384, regionServicesForStores.getGlobalMemstoreTotalSize());
+    // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells.
+    // Out of total 10, only 4 cells are unique
+    totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated
+    totalCellsLen3 = 0;// All duplicated cells.
+    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
+        regionServicesForStores.getMemstoreSize());
+    // Only 4 unique cells left
+    assertEquals(4 * (KeyValue.FIXED_OVERHEAD + ClassSize.CELL_ARRAY_MAP_ENTRY),
+        ((CompactingMemStore) memstore).heapOverhead());
 
     size = memstore.getFlushableSize();
     MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
-    region.addAndGetGlobalMemstoreSize(-size);  // simulate flusher
+    region.decrMemstoreSize(size);  // simulate flusher
     ImmutableSegment s = memstore.getSnapshot();
     assertEquals(4, s.getCellsCount());
-    assertEquals(0, regionServicesForStores.getGlobalMemstoreTotalSize());
+    assertEquals(0, regionServicesForStores.getMemstoreSize());
 
     memstore.clearSnapshot(snapshot.getId());
 
     //assertTrue(tstStr, false);
   }
 
-  private void addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
+  private int addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
     byte[] fam = Bytes.toBytes("testfamily");
     byte[] qf = Bytes.toBytes("testqualifier");
     long size = hmc.getActive().keySize();
+    long heapOverhead = hmc.getActive().heapOverhead();
+    int totalLen = 0;
     for (int i = 0; i < keys.length; i++) {
       long timestamp = System.currentTimeMillis();
       Threads.sleep(1); // to make sure each kv gets a different ts
       byte[] row = Bytes.toBytes(keys[i]);
       byte[] val = Bytes.toBytes(keys[i] + i);
       KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
-      hmc.add(kv);
+      totalLen += kv.getLength();
+      hmc.add(kv, null);
       LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
     }
-    regionServicesForStores.addAndGetGlobalMemstoreSize(hmc.getActive().keySize() - size);
+    regionServicesForStores.addMemstoreSize(new MemstoreSize(hmc.getActive().keySize() - size,
+        hmc.getActive().heapOverhead() - heapOverhead));
+    return totalLen;
   }
 
   private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
index 6499251..c72cae3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingToCellArrayMapMemStore.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
-
+import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.Threads;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -78,27 +78,34 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
     String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4
 
     // test 1 bucket
-    addRowsByKeys(memstore, keys1);
-    assertEquals(496, regionServicesForStores.getGlobalMemstoreTotalSize());
+    long totalCellsLen = addRowsByKeys(memstore, keys1);
+    long totalHeapOverhead = 4 * (KeyValue.FIXED_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
+    assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapOverhead, ((CompactingMemStore)memstore).heapOverhead());
 
     assertEquals(4, memstore.getActive().getCellsCount());
-    long size = memstore.getFlushableSize();
+    MemstoreSize size = memstore.getFlushableSize();
     ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
     while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
       Threads.sleep(10);
     }
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize());
+    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
+    // totalCellsLen
+    totalCellsLen = (totalCellsLen * 3) / 4;
+    assertEquals(totalCellsLen, regionServicesForStores.getMemstoreSize());
+    totalHeapOverhead = 3 * (KeyValue.FIXED_OVERHEAD + ClassSize.CELL_ARRAY_MAP_ENTRY);
+    assertEquals(totalHeapOverhead, ((CompactingMemStore)memstore).heapOverhead());
     for ( Segment s : memstore.getSegments()) {
       counter += s.getCellsCount();
     }
     assertEquals(3, counter);
     size = memstore.getFlushableSize();
     MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
-    region.addAndGetGlobalMemstoreSize(-size);  // simulate flusher
+    region.decrMemstoreSize(size);  // simulate flusher
     ImmutableSegment s = memstore.getSnapshot();
     assertEquals(3, s.getCellsCount());
-    assertEquals(0, regionServicesForStores.getGlobalMemstoreTotalSize());
+    assertEquals(0, regionServicesForStores.getMemstoreSize());
 
     memstore.clearSnapshot(snapshot.getId());
   }
@@ -108,13 +115,12 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
     String[] keys1 = { "A", "A", "B", "C" };
     String[] keys2 = { "A", "B", "D" };
 
-    addRowsByKeys(memstore, keys1);
-    assertEquals(496, regionServicesForStores.getGlobalMemstoreTotalSize());
-    long size = memstore.getFlushableSize();
-
-//    assertTrue(
-//        "\n\n<<< This is the active size with 4 keys - " + memstore.getActive().getSize()
-//            + ". This is the memstore flushable size - " + size + "\n",false);
+    long totalCellsLen1 = addRowsByKeys(memstore, keys1);
+    long totalHeapOverhead1 = 4
+        * (KeyValue.FIXED_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
+    assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapOverhead1, ((CompactingMemStore) memstore).heapOverhead());
+    MemstoreSize size = memstore.getFlushableSize();
 
     ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
     while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
@@ -126,10 +132,19 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
     }
     assertEquals(3,counter);
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize());
-
-    addRowsByKeys(memstore, keys2);
-    assertEquals(640, regionServicesForStores.getGlobalMemstoreTotalSize());
+    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
+    // totalCellsLen
+    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
+    totalHeapOverhead1 = 3 * (KeyValue.FIXED_OVERHEAD + ClassSize.CELL_ARRAY_MAP_ENTRY);
+    assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapOverhead1, ((CompactingMemStore) memstore).heapOverhead());
+
+    long totalCellsLen2 = addRowsByKeys(memstore, keys2);
+    long totalHeapOverhead2 = 3
+        * (KeyValue.FIXED_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
+    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapOverhead1 + totalHeapOverhead2,
+        ((CompactingMemStore) memstore).heapOverhead());
 
     size = memstore.getFlushableSize();
     ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
@@ -147,14 +162,18 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
       counter += s.getCellsCount();
     }
     assertEquals(4,counter);
-    assertEquals(368, regionServicesForStores.getGlobalMemstoreTotalSize());
+    totalCellsLen2 = totalCellsLen2 / 3;// 2 cells duplicated in set 2
+    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
+    totalHeapOverhead2 = 1 * (KeyValue.FIXED_OVERHEAD + ClassSize.CELL_ARRAY_MAP_ENTRY);
+    assertEquals(totalHeapOverhead1 + totalHeapOverhead2,
+        ((CompactingMemStore) memstore).heapOverhead());
 
     size = memstore.getFlushableSize();
     MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
-    region.addAndGetGlobalMemstoreSize(-size);  // simulate flusher
+    region.decrMemstoreSize(size);  // simulate flusher
     ImmutableSegment s = memstore.getSnapshot();
     assertEquals(4, s.getCellsCount());
-    assertEquals(0, regionServicesForStores.getGlobalMemstoreTotalSize());
+    assertEquals(0, regionServicesForStores.getMemstoreSize());
 
     memstore.clearSnapshot(snapshot.getId());
   }
@@ -165,36 +184,49 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
     String[] keys2 = { "A", "B", "D" };
     String[] keys3 = { "D", "B", "B" };
 
-    addRowsByKeys(memstore, keys1);
-    assertEquals(496, region.getMemstoreSize());
+    long totalCellsLen1 = addRowsByKeys(memstore, keys1);
+    long totalHeapOverhead1 = 4
+        * (KeyValue.FIXED_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
+    assertEquals(totalCellsLen1, region.getMemstoreSize());
+    assertEquals(totalHeapOverhead1, ((CompactingMemStore) memstore).heapOverhead());
 
-    long size = memstore.getFlushableSize();
+    MemstoreSize size = memstore.getFlushableSize();
     ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
 
-    String tstStr = "\n\nFlushable size after first flush in memory:" + size + ". Is MemmStore in compaction?:"
-        + ((CompactingMemStore) memstore).isMemStoreFlushingInMemory();
     while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
       Threads.sleep(10);
     }
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    assertEquals(264, regionServicesForStores.getGlobalMemstoreTotalSize());
-
-    addRowsByKeys(memstore, keys2);
+    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
+    // totalCellsLen
+    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
+    totalHeapOverhead1 = 3 * (KeyValue.FIXED_OVERHEAD + ClassSize.CELL_ARRAY_MAP_ENTRY);
+    assertEquals(totalCellsLen1, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapOverhead1, ((CompactingMemStore) memstore).heapOverhead());
 
-    tstStr += " After adding second part of the keys. Memstore size: " +
-        region.getMemstoreSize() + ", Memstore Total Size: " +
-        regionServicesForStores.getGlobalMemstoreTotalSize() + "\n\n";
+    long totalCellsLen2 = addRowsByKeys(memstore, keys2);
+    long totalHeapOverhead2 = 3
+        * (KeyValue.FIXED_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
 
-    assertEquals(640, regionServicesForStores.getGlobalMemstoreTotalSize());
+    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapOverhead1 + totalHeapOverhead2,
+        ((CompactingMemStore) memstore).heapOverhead());
 
     ((CompactingMemStore) memstore).disableCompaction();
     size = memstore.getFlushableSize();
     ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    assertEquals(640, regionServicesForStores.getGlobalMemstoreTotalSize());
-
-    addRowsByKeys(memstore, keys3);
-    assertEquals(1016, regionServicesForStores.getGlobalMemstoreTotalSize());
+    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapOverhead1 + totalHeapOverhead2,
+        ((CompactingMemStore) memstore).heapOverhead());
+
+    long totalCellsLen3 = addRowsByKeys(memstore, keys3);
+    long totalHeapOverhead3 = 3
+        * (KeyValue.FIXED_OVERHEAD + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
+    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
+        regionServicesForStores.getMemstoreSize());
+    assertEquals(totalHeapOverhead1 + totalHeapOverhead2 + totalHeapOverhead3,
+        ((CompactingMemStore) memstore).heapOverhead());
 
     ((CompactingMemStore) memstore).enableCompaction();
     size = memstore.getFlushableSize();
@@ -203,14 +235,22 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
       Threads.sleep(10);
     }
     assertEquals(0, memstore.getSnapshot().getCellsCount());
-    assertEquals(384, regionServicesForStores.getGlobalMemstoreTotalSize());
+    // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells.
+    // Out of total 10, only 4 cells are unique
+    totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated
+    totalCellsLen3 = 0;// All duplicated cells.
+    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
+        regionServicesForStores.getMemstoreSize());
+    // Only 4 unique cells left
+    assertEquals(4 * (KeyValue.FIXED_OVERHEAD + ClassSize.CELL_ARRAY_MAP_ENTRY),
+        ((CompactingMemStore) memstore).heapOverhead());
 
     size = memstore.getFlushableSize();
     MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
-    region.addAndGetGlobalMemstoreSize(-size);  // simulate flusher
+    region.decrMemstoreSize(size);  // simulate flusher
     ImmutableSegment s = memstore.getSnapshot();
     assertEquals(4, s.getCellsCount());
-    assertEquals(0, regionServicesForStores.getGlobalMemstoreTotalSize());
+    assertEquals(0, regionServicesForStores.getMemstoreSize());
 
     memstore.clearSnapshot(snapshot.getId());
 
@@ -339,24 +379,25 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
         byte[] qf = Bytes.toBytes("testqualifier"+j);
         byte[] val = Bytes.toBytes(keys[i] + j);
         KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
-        hmc.add(kv);
+        hmc.add(kv, null);
       }
     }
   }
 
-  private void addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
+  private long addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
     byte[] fam = Bytes.toBytes("testfamily");
     byte[] qf = Bytes.toBytes("testqualifier");
-    long size = hmc.getActive().size();//
+    MemstoreSize memstoreSize = new MemstoreSize();
     for (int i = 0; i < keys.length; i++) {
       long timestamp = System.currentTimeMillis();
       Threads.sleep(1); // to make sure each kv gets a different ts
       byte[] row = Bytes.toBytes(keys[i]);
       byte[] val = Bytes.toBytes(keys[i] + i);
       KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
-      hmc.add(kv);
+      hmc.add(kv, memstoreSize);
       LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + kv.getTimestamp());
     }
-    regionServicesForStores.addAndGetGlobalMemstoreSize(hmc.getActive().size() - size);//
+    regionServicesForStores.addMemstoreSize(memstoreSize);
+    return memstoreSize.getDataSize();
   }
 }


Mime
View raw message