hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From anoopsamj...@apache.org
Subject [4/4] hbase git commit: HBASE-16747 Track memstore data size and heap overhead separately.
Date Sun, 30 Oct 2016 06:51:12 GMT
HBASE-16747 Track memstore data size and heap overhead separately.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ba6d9523
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ba6d9523
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ba6d9523

Branch: refs/heads/master
Commit: ba6d95232401ce533fb8c121ade7f4a864d06f12
Parents: 6127753
Author: anoopsamjohn <anoopsamjohn@gmail.com>
Authored: Sun Oct 30 12:20:46 2016 +0530
Committer: anoopsamjohn <anoopsamjohn@gmail.com>
Committed: Sun Oct 30 12:20:46 2016 +0530

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hbase/CellUtil.java  |   6 +
 .../org/apache/hadoop/hbase/ExtendedCell.java   |   5 +
 .../java/org/apache/hadoop/hbase/KeyValue.java  |  22 +-
 .../apache/hadoop/hbase/OffheapKeyValue.java    |   9 +-
 .../apache/hadoop/hbase/SizeCachedKeyValue.java |  11 +-
 .../io/encoding/BufferedDataBlockEncoder.java   |  10 +
 .../hbase/mob/DefaultMobStoreFlusher.java       |   2 +-
 .../hbase/regionserver/AbstractMemStore.java    | 209 ++-------------
 .../hbase/regionserver/CompactingMemStore.java  |  51 ++--
 .../hbase/regionserver/CompactionPipeline.java  |  43 +--
 .../hbase/regionserver/DefaultMemStore.java     |  39 ++-
 .../hbase/regionserver/DefaultStoreFlusher.java |   2 +-
 .../regionserver/FlushLargeStoresPolicy.java    |   5 +-
 .../hadoop/hbase/regionserver/HRegion.java      | 250 ++++++++++--------
 .../hadoop/hbase/regionserver/HStore.java       |  92 +++----
 .../hbase/regionserver/HeapMemoryManager.java   |   6 +-
 .../hbase/regionserver/ImmutableSegment.java    |  46 ++--
 .../hadoop/hbase/regionserver/MemStore.java     |  57 ++--
 .../hbase/regionserver/MemStoreFlusher.java     |  29 +-
 .../hbase/regionserver/MemStoreSnapshot.java    |  14 +-
 .../hadoop/hbase/regionserver/MemstoreSize.java |  91 +++++++
 .../MetricsRegionServerWrapperImpl.java         |   2 +-
 .../regionserver/MetricsRegionWrapperImpl.java  |   2 +-
 .../hbase/regionserver/MutableSegment.java      |  67 ++++-
 .../hadoop/hbase/regionserver/Region.java       |   6 +-
 .../regionserver/RegionServerAccounting.java    |  54 ++--
 .../regionserver/RegionServicesForStores.java   |   9 +-
 .../hadoop/hbase/regionserver/Segment.java      |  71 +++--
 .../apache/hadoop/hbase/regionserver/Store.java |  23 ++
 .../hadoop/hbase/client/TestClientPushback.java |   6 +-
 .../regionserver/TestCompactingMemStore.java    | 263 +++++++++----------
 .../TestCompactingToCellArrayMapMemStore.java   | 131 +++++----
 .../hbase/regionserver/TestDefaultMemStore.java | 174 +++++-------
 .../hbase/regionserver/TestHMobStore.java       |  87 +++---
 .../hadoop/hbase/regionserver/TestHRegion.java  |  19 +-
 .../regionserver/TestHRegionReplayEvents.java   |   4 +-
 .../regionserver/TestMemStoreChunkPool.java     |  24 +-
 .../regionserver/TestPerColumnFamilyFlush.java  | 104 ++++----
 .../TestRegionMergeTransaction.java             |   5 +-
 .../regionserver/TestReversibleScanners.java    |   6 +-
 .../regionserver/TestSplitTransaction.java      |   5 +-
 .../hadoop/hbase/regionserver/TestStore.java    | 258 +++---------------
 .../TestWalAndCompactingMemStoreFlush.java      | 252 +++++++-----------
 .../regionserver/wal/AbstractTestWALReplay.java |   8 +-
 44 files changed, 1225 insertions(+), 1354 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
index 7988352..484eebd 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
@@ -381,6 +381,7 @@ public final class CellUtil {
   private static class TagRewriteCell implements ExtendedCell {
     protected Cell cell;
     protected byte[] tags;
+    private static final long HEAP_SIZE_OVERHEAD = 2 * ClassSize.REFERENCE + ClassSize.ARRAY;
 
     /**
      * @param cell The original Cell which it rewrites
@@ -552,6 +553,11 @@ public final class CellUtil {
       offset = Bytes.putAsShort(buf, offset, tagsLen);
       System.arraycopy(this.tags, 0, buf, offset, tagsLen);
     }
+
+    @Override
+    public long heapOverhead() {
+      return ((ExtendedCell) this.cell).heapOverhead() + HEAP_SIZE_OVERHEAD;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java
index 420a5f9..f60da14 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ExtendedCell.java
@@ -66,4 +66,9 @@ public interface ExtendedCell extends Cell, SettableSequenceId, SettableTimestam
    * @param offset The offset within buffer, to write the Cell.
    */
   void write(byte[] buf, int offset);
+
+  /**
+   * @return The heap size overhead associated with this Cell.
+   */
+  long heapOverhead();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
index f9a621a..b00ca1b 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
@@ -85,6 +85,11 @@ public class KeyValue implements ExtendedCell {
 
   private static final Log LOG = LogFactory.getLog(KeyValue.class);
 
+  public static final long FIXED_OVERHEAD = ClassSize.OBJECT + // the KeyValue object itself
+      ClassSize.REFERENCE + // pointer to "bytes"
+      2 * Bytes.SIZEOF_INT + // offset, length
+      Bytes.SIZEOF_LONG;// memstoreTS
+
   /**
    * Colon character in UTF-8
    */
@@ -2603,12 +2608,7 @@ public class KeyValue implements ExtendedCell {
    */
   @Override
   public long heapSize() {
-    int sum = 0;
-    sum += ClassSize.OBJECT;// the KeyValue object itself
-    sum += ClassSize.REFERENCE;// pointer to "bytes"
-    sum += 2 * Bytes.SIZEOF_INT;// offset, length
-    sum += Bytes.SIZEOF_LONG;// memstoreTS
-
+    long sum = FIXED_OVERHEAD;
     /*
      * Deep object overhead for this KV consists of two parts. The first part is the KV object
      * itself, while the second part is the backing byte[]. We will only count the array overhead
@@ -2812,5 +2812,15 @@ public class KeyValue implements ExtendedCell {
       // of Cell to be returned back over the RPC
       throw new IllegalStateException("A reader should never return this type of a Cell");
     }
+
+    @Override
+    public long heapOverhead() {
+      return super.heapOverhead() + Bytes.SIZEOF_SHORT;
+    }
+  }
+
+  @Override
+  public long heapOverhead() {
+    return FIXED_OVERHEAD;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java
index ae2496b..2165362 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/OffheapKeyValue.java
@@ -42,7 +42,7 @@ public class OffheapKeyValue extends ByteBufferedCell implements ExtendedCell {
   private final boolean hasTags;
   // TODO : See if famLen can be cached or not?
 
-  private static final int FIXED_HEAP_SIZE_OVERHEAD = ClassSize.OBJECT + ClassSize.REFERENCE
+  private static final int FIXED_OVERHEAD = ClassSize.OBJECT + ClassSize.REFERENCE
       + (3 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_SHORT
       + Bytes.SIZEOF_BOOLEAN + Bytes.SIZEOF_LONG;
 
@@ -235,7 +235,7 @@ public class OffheapKeyValue extends ByteBufferedCell implements ExtendedCell {
 
   @Override
   public long heapSize() {
-    return ClassSize.align(FIXED_HEAP_SIZE_OVERHEAD + ClassSize.align(length));
+    return ClassSize.align(FIXED_OVERHEAD + ClassSize.align(length));
   }
 
   @Override
@@ -276,4 +276,9 @@ public class OffheapKeyValue extends ByteBufferedCell implements ExtendedCell {
     // TODO when doing HBASE-15179
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public long heapOverhead() {
+    return FIXED_OVERHEAD;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-common/src/main/java/org/apache/hadoop/hbase/SizeCachedKeyValue.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/SizeCachedKeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/SizeCachedKeyValue.java
index cccbae0..60c4cbf 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/SizeCachedKeyValue.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/SizeCachedKeyValue.java
@@ -32,7 +32,9 @@ import org.apache.hadoop.hbase.util.Bytes;
 @InterfaceAudience.Private
 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_DOESNT_OVERRIDE_EQUALS")
 public class SizeCachedKeyValue extends KeyValue {
-  private static final int HEAP_SIZE_OVERHEAD = Bytes.SIZEOF_SHORT + Bytes.SIZEOF_INT;
+  // Overhead in this class alone. Parent's overhead will be considered in usage places by calls to
+  // super. methods
+  private static final int FIXED_OVERHEAD = Bytes.SIZEOF_SHORT + Bytes.SIZEOF_INT;
 
   private short rowLen;
   private int keyLen;
@@ -58,6 +60,11 @@ public class SizeCachedKeyValue extends KeyValue {
 
   @Override
   public long heapSize() {
-    return super.heapSize() + HEAP_SIZE_OVERHEAD;
+    return super.heapSize() + FIXED_OVERHEAD;
+  }
+
+  @Override
+  public long heapOverhead() {
+    return super.heapOverhead() + FIXED_OVERHEAD;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
index edecd9a..216a82d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java
@@ -468,6 +468,11 @@ abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder {
       // This is not used in actual flow. Throwing UnsupportedOperationException
       throw new UnsupportedOperationException();
     }
+
+    @Override
+    public long heapOverhead() {
+      return FIXED_OVERHEAD;
+    }
   }
 
   protected static class OffheapDecodedCell extends ByteBufferedCell implements ExtendedCell {
@@ -707,6 +712,11 @@ abstract class BufferedDataBlockEncoder extends AbstractDataBlockEncoder {
       // This is not used in actual flow. Throwing UnsupportedOperationException
       throw new UnsupportedOperationException();
     }
+
+    @Override
+    public long heapOverhead() {
+      return FIXED_OVERHEAD;
+    }
   }
 
   protected abstract static class BufferedEncodedSeeker<STATE extends SeekerState>

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
index eb2564d..a5229b1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
@@ -129,7 +129,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
       scanner.close();
     }
     LOG.info("Mob store is flushed, sequenceid=" + cacheFlushId + ", memsize="
-        + StringUtils.TraditionalBinaryPrefix.long2String(snapshot.getSize(), "", 1) +
+        + StringUtils.TraditionalBinaryPrefix.long2String(snapshot.getDataSize(), "", 1) +
         ", hasBloomFilter=" + writer.hasGeneralBloom() +
         ", into tmp file " + writer.getPath());
     result.add(writer.getPath());

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
index 5544251..a4ea3ee 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java
@@ -20,8 +20,6 @@ package org.apache.hadoop.hbase.regionserver;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.SortedSet;
@@ -30,9 +28,6 @@ import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.ShareableMemory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
@@ -78,18 +73,6 @@ public abstract class AbstractMemStore implements MemStore {
     this.timeOfOldestEdit = Long.MAX_VALUE;
   }
 
-  /*
-  * Calculate how the MemStore size has changed.  Includes overhead of the
-  * backing Map.
-  * @param cell
-  * @param notPresent True if the cell was NOT present in the set.
-  * @return change in size
-  */
-  static long heapSizeChange(final Cell cell, final boolean notPresent) {
-    return notPresent ? ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY
-        + CellUtil.estimatedHeapSizeOf(cell)) : 0;
-  }
-
   /**
    * Updates the wal with the lowest sequence id (oldest entry) that is still in memory
    * @param onlyIfMoreRecent a flag that marks whether to update the sequence id no matter what or
@@ -98,22 +81,14 @@ public abstract class AbstractMemStore implements MemStore {
   public abstract void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfMoreRecent);
 
   @Override
-  public long add(Iterable<Cell> cells) {
-    long size = 0;
+  public void add(Iterable<Cell> cells, MemstoreSize memstoreSize) {
     for (Cell cell : cells) {
-      size += add(cell);
+      add(cell, memstoreSize);
     }
-    return size;
   }
-  
-  /**
-   * Write an update
-   * @param cell the cell to be added
-   * @return approximate size of the passed cell & newly added cell which maybe different than the
-   *         passed-in cell
-   */
+
   @Override
-  public long add(Cell cell) {
+  public void add(Cell cell, MemstoreSize memstoreSize) {
     Cell toAdd = maybeCloneWithAllocator(cell);
     boolean mslabUsed = (toAdd != cell);
     // This cell data is backed by the same byte[] where we read request in RPC(See HBASE-15180). By
@@ -128,7 +103,7 @@ public abstract class AbstractMemStore implements MemStore {
     if (!mslabUsed) {
       toAdd = deepCopyIfNeeded(toAdd);
     }
-    return internalAdd(toAdd, mslabUsed);
+    internalAdd(toAdd, mslabUsed, memstoreSize);
   }
 
   private static Cell deepCopyIfNeeded(Cell cell) {
@@ -141,31 +116,11 @@ public abstract class AbstractMemStore implements MemStore {
     return cell;
   }
 
-  /**
-   * Update or insert the specified Cells.
-   * <p>
-   * For each Cell, insert into MemStore.  This will atomically upsert the
-   * value for that row/family/qualifier.  If a Cell did already exist,
-   * it will then be removed.
-   * <p>
-   * Currently the memstoreTS is kept at 0 so as each insert happens, it will
-   * be immediately visible.  May want to change this so it is atomic across
-   * all Cells.
-   * <p>
-   * This is called under row lock, so Get operations will still see updates
-   * atomically.  Scans will only see each Cell update as atomic.
-   *
-   * @param cells the cells to be updated
-   * @param readpoint readpoint below which we can safely remove duplicate KVs
-   * @return change in memstore size
-   */
   @Override
-  public long upsert(Iterable<Cell> cells, long readpoint) {
-    long size = 0;
+  public void upsert(Iterable<Cell> cells, long readpoint, MemstoreSize memstoreSize) {
     for (Cell cell : cells) {
-      size += upsert(cell, readpoint);
+      upsert(cell, readpoint, memstoreSize);
     }
-    return size;
   }
 
   /**
@@ -176,18 +131,6 @@ public abstract class AbstractMemStore implements MemStore {
     return timeOfOldestEdit;
   }
 
-
-  /**
-   * Write a delete
-   * @param deleteCell the cell to be deleted
-   * @return approximate size of the passed key and value.
-   */
-  @Override
-  public long delete(Cell deleteCell) {
-    // Delete operation just adds the delete marker cell coming here.
-    return add(deleteCell);
-  }
-
   /**
    * The passed snapshot was successfully persisted; it can be let go.
    * @param id Id of the snapshot to clean out.
@@ -210,18 +153,9 @@ public abstract class AbstractMemStore implements MemStore {
     oldSnapshot.close();
   }
 
-  /**
-   * Get the entire heap usage for this MemStore not including keys in the
-   * snapshot.
-   */
-  @Override
-  public long heapSize() {
-    return size();
-  }
-
   @Override
-  public long getSnapshotSize() {
-    return this.snapshot.keySize();
+  public MemstoreSize getSnapshotSize() {
+    return new MemstoreSize(this.snapshot.keySize(), this.snapshot.heapOverhead());
   }
 
   @Override
@@ -249,7 +183,7 @@ public abstract class AbstractMemStore implements MemStore {
   }
 
 
-  /**
+  /*
    * Inserts the specified Cell into MemStore and deletes any existing
    * versions of the same row/family/qualifier as the specified Cell.
    * <p>
@@ -262,9 +196,9 @@ public abstract class AbstractMemStore implements MemStore {
    *
    * @param cell the cell to be updated
    * @param readpoint readpoint below which we can safely remove duplicate KVs
-   * @return change in size of MemStore
+   * @param memstoreSize
    */
-  private long upsert(Cell cell, long readpoint) {
+  private void upsert(Cell cell, long readpoint, MemstoreSize memstoreSize) {
     // Add the Cell to the MemStore
     // Use the internalAdd method here since we (a) already have a lock
     // and (b) cannot safely use the MSLAB here without potentially
@@ -275,50 +209,9 @@ public abstract class AbstractMemStore implements MemStore {
     // must do below deep copy. Or else we will keep referring to the bigger chunk of memory and
     // prevent it from getting GCed.
     cell = deepCopyIfNeeded(cell);
-    long addedSize = internalAdd(cell, false);
-
-    // Get the Cells for the row/family/qualifier regardless of timestamp.
-    // For this case we want to clean up any other puts
-    Cell firstCell = CellUtil.createFirstOnRow(
-        cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
-        cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
-        cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
-    SortedSet<Cell> ss = active.tailSet(firstCell);
-    Iterator<Cell> it = ss.iterator();
-    // versions visible to oldest scanner
-    int versionsVisible = 0;
-    while (it.hasNext()) {
-      Cell cur = it.next();
-
-      if (cell == cur) {
-        // ignore the one just put in
-        continue;
-      }
-      // check that this is the row and column we are interested in, otherwise bail
-      if (CellUtil.matchingRow(cell, cur) && CellUtil.matchingQualifier(cell, cur)) {
-        // only remove Puts that concurrent scanners cannot possibly see
-        if (cur.getTypeByte() == KeyValue.Type.Put.getCode() &&
-            cur.getSequenceId() <= readpoint) {
-          if (versionsVisible >= 1) {
-            // if we get here we have seen at least one version visible to the oldest scanner,
-            // which means we can prove that no scanner will see this version
-
-            // false means there was a change, so give us the size.
-            long delta = heapSizeChange(cur, true);
-            addedSize -= delta;
-            active.incSize(-delta);
-            it.remove();
-            setOldestEditTimeToNow();
-          } else {
-            versionsVisible++;
-          }
-        }
-      } else {
-        // past the row or column, done
-        break;
-      }
-    }
-    return addedSize;
+    this.active.upsert(cell, readpoint, memstoreSize);
+    setOldestEditTimeToNow();
+    checkActiveSize();
   }
 
   /*
@@ -359,75 +252,23 @@ public abstract class AbstractMemStore implements MemStore {
     return result;
   }
 
-  /**
-   * Given the specs of a column, update it, first by inserting a new record,
-   * then removing the old one.  Since there is only 1 KeyValue involved, the memstoreTS
-   * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
-   * store will ensure that the insert/delete each are atomic. A scanner/reader will either
-   * get the new value, or the old value and all readers will eventually only see the new
-   * value after the old was removed.
-   */
-  @VisibleForTesting
-  @Override
-  public long updateColumnValue(byte[] row, byte[] family, byte[] qualifier,
-      long newValue, long now) {
-    Cell firstCell = KeyValueUtil.createFirstOnRow(row, family, qualifier);
-    // Is there a Cell in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
-    Cell snc = snapshot.getFirstAfter(firstCell);
-    if(snc != null) {
-      // is there a matching Cell in the snapshot?
-      if (CellUtil.matchingRow(snc, firstCell) && CellUtil.matchingQualifier(snc, firstCell)) {
-        if (snc.getTimestamp() == now) {
-          now += 1;
-        }
-      }
-    }
-    // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
-    // But the timestamp should also be max(now, mostRecentTsInMemstore)
-
-    // so we cant add the new Cell w/o knowing what's there already, but we also
-    // want to take this chance to delete some cells. So two loops (sad)
-
-    SortedSet<Cell> ss = this.active.tailSet(firstCell);
-    for (Cell cell : ss) {
-      // if this isnt the row we are interested in, then bail:
-      if (!CellUtil.matchingColumn(cell, family, qualifier)
-          || !CellUtil.matchingRow(cell, firstCell)) {
-        break; // rows dont match, bail.
-      }
-
-      // if the qualifier matches and it's a put, just RM it out of the active.
-      if (cell.getTypeByte() == KeyValue.Type.Put.getCode() &&
-          cell.getTimestamp() > now && CellUtil.matchingQualifier(firstCell, cell)) {
-        now = cell.getTimestamp();
-      }
-    }
-
-    // create or update (upsert) a new Cell with
-    // 'now' and a 0 memstoreTS == immediately visible
-    List<Cell> cells = new ArrayList<Cell>(1);
-    cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)));
-    return upsert(cells, 1L);
-  }
-
   private Cell maybeCloneWithAllocator(Cell cell) {
     return active.maybeCloneWithAllocator(cell);
   }
 
-  /**
+  /*
    * Internal version of add() that doesn't clone Cells with the
    * allocator, and doesn't take the lock.
    *
    * Callers should ensure they already have the read lock taken
    * @param toAdd the cell to add
    * @param mslabUsed whether using MSLAB
-   * @return the heap size change in bytes
+   * @param memstoreSize
    */
-  private long internalAdd(final Cell toAdd, final boolean mslabUsed) {
-    long s = active.add(toAdd, mslabUsed);
+  private void internalAdd(final Cell toAdd, final boolean mslabUsed, MemstoreSize memstoreSize) {
+    active.add(toAdd, mslabUsed, memstoreSize);
     setOldestEditTimeToNow();
     checkActiveSize();
-    return s;
   }
 
   private void setOldestEditTimeToNow() {
@@ -437,11 +278,15 @@ public abstract class AbstractMemStore implements MemStore {
   }
 
   /**
-   * @return The size of the active segment. Means sum of all cell's size.
+   * @return The total size of cells in this memstore. We will not consider cells in the snapshot
    */
-  protected long keySize() {
-    return this.active.keySize();
-  }
+  protected abstract long keySize();
+
+  /**
+   * @return The total heap overhead of cells in this memstore. We will not consider cells in the
+   *         snapshot
+   */
+  protected abstract long heapOverhead();
 
   protected CellComparator getComparator() {
     return comparator;

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index 1ecd868..a7eb19e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -100,19 +100,19 @@ public class CompactingMemStore extends AbstractMemStore {
   }
 
   /**
-   * @return Total memory occupied by this MemStore. This includes active segment size and heap size
-   *         overhead of this memstore but won't include any size occupied by the snapshot. We
-   *         assume the snapshot will get cleared soon. This is not thread safe and the memstore may
-   *         be changed while computing its size. It is the responsibility of the caller to make
-   *         sure this doesn't happen.
+   * @return Total memory occupied by this MemStore. This won't include any size occupied by the
+   *         snapshot. We assume the snapshot will get cleared soon. This is not thread safe and
+   *         the memstore may be changed while computing its size. It is the responsibility of the
+   *         caller to make sure this doesn't happen.
    */
   @Override
-  public long size() {
-    long res = DEEP_OVERHEAD + this.active.size();
+  public MemstoreSize size() {
+    MemstoreSize memstoreSize = new MemstoreSize();
+    memstoreSize.incMemstoreSize(this.active.keySize(), this.active.heapOverhead());
     for (Segment item : pipeline.getSegments()) {
-      res += CompactionPipeline.ENTRY_OVERHEAD + item.size();
+      memstoreSize.incMemstoreSize(item.keySize(), item.heapOverhead());
     }
-    return res;
+    return memstoreSize;
   }
 
   /**
@@ -163,13 +163,34 @@ public class CompactingMemStore extends AbstractMemStore {
    * @return size of data that is going to be flushed
    */
   @Override
-  public long getFlushableSize() {
-    long snapshotSize = getSnapshotSize();
-    if (snapshotSize == 0) {
+  public MemstoreSize getFlushableSize() {
+    MemstoreSize snapshotSize = getSnapshotSize();
+    if (snapshotSize.getDataSize() == 0) {
       // if snapshot is empty the tail of the pipeline is flushed
       snapshotSize = pipeline.getTailSize();
     }
-    return snapshotSize > 0 ? snapshotSize : keySize();
+    return snapshotSize.getDataSize() > 0 ? snapshotSize
+        : new MemstoreSize(this.active.keySize(), this.active.heapOverhead());
+  }
+
+  @Override
+  protected long keySize() {
+    // Need to consider keySize of all segments in pipeline and active
+    long k = this.active.keySize();
+    for (Segment segment : this.pipeline.getSegments()) {
+      k += segment.keySize();
+    }
+    return k;
+  }
+
+  @Override
+  protected long heapOverhead() {
+    // Need to consider heapOverhead of all segments in pipeline and active
+    long h = this.active.heapOverhead();
+    for (Segment segment : this.pipeline.getSegments()) {
+      h += segment.heapOverhead();
+    }
+    return h;
   }
 
   @Override
@@ -318,7 +339,7 @@ public class CompactingMemStore extends AbstractMemStore {
   }
 
   private boolean shouldFlushInMemory() {
-    if (this.active.size() > inmemoryFlushSize) { // size above flush threshold
+    if (this.active.keySize() > inmemoryFlushSize) { // size above flush threshold
         // the inMemoryFlushInProgress is CASed to be true here in order to mutual exclude
         // the insert of the active into the compaction pipeline
         return (inMemoryFlushInProgress.compareAndSet(false,true));
@@ -419,7 +440,7 @@ public class CompactingMemStore extends AbstractMemStore {
 
   // debug method
   public void debug() {
-    String msg = "active size=" + this.active.size();
+    String msg = "active size=" + this.active.keySize();
     msg += " threshold="+IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT* inmemoryFlushSize;
     msg += " allow compaction is "+ (allowCompaction.get() ? "true" : "false");
     msg += " inMemoryFlushInProgress is "+ (inMemoryFlushInProgress.get() ? "true" : "false");

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index 28c9383..6676170 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -115,19 +115,30 @@ public class CompactionPipeline {
     }
     if (region != null) {
       // update the global memstore size counter
-      long suffixSize = getSegmentsKeySize(suffix);
-      long newSize = segment.keySize();
-      long delta = suffixSize - newSize;
-      assert ( closeSuffix || delta>0 ); // sanity check
-      long globalMemstoreSize = region.addAndGetGlobalMemstoreSize(-delta);
+      long suffixDataSize = getSegmentsKeySize(suffix);
+      long newDataSize = segment.keySize();
+      long dataSizeDelta = suffixDataSize - newDataSize;
+      long suffixHeapOverhead = getSegmentsHeapOverhead(suffix);
+      long newHeapOverhead = segment.heapOverhead();
+      long heapOverheadDelta = suffixHeapOverhead - newHeapOverhead;
+      region.addMemstoreSize(new MemstoreSize(-dataSizeDelta, -heapOverheadDelta));
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Suffix size: " + suffixSize + " compacted item size: " + newSize
-            + " globalMemstoreSize: " + globalMemstoreSize);
+        LOG.debug("Suffix data size: " + suffixDataSize + " compacted item data size: "
+            + newDataSize + ". Suffix heap overhead: " + suffixHeapOverhead
+            + " compacted item heap overhead: " + newHeapOverhead);
       }
     }
     return true;
   }
 
+  private static long getSegmentsHeapOverhead(List<? extends Segment> list) {
+    long res = 0;
+    for (Segment segment : list) {
+      res += segment.heapOverhead();
+    }
+    return res;
+  }
+
   private static long getSegmentsKeySize(List<? extends Segment> list) {
     long res = 0;
     for (Segment segment : list) {
@@ -160,16 +171,12 @@ public class CompactionPipeline {
 
       for (ImmutableSegment s : pipeline) {
         // remember the old size in case this segment is going to be flatten
-        long sizeBeforeFlat = s.keySize();
-        long globalMemstoreSize = 0;
-        if (s.flatten()) {
+        MemstoreSize memstoreSize = new MemstoreSize();
+        if (s.flatten(memstoreSize)) {
           if(region != null) {
-            long sizeAfterFlat = s.keySize();
-            long delta = sizeBeforeFlat - sizeAfterFlat;
-            globalMemstoreSize = region.addAndGetGlobalMemstoreSize(-delta);
+            region.addMemstoreSize(memstoreSize);
           }
-          LOG.debug("Compaction pipeline segment " + s + " was flattened; globalMemstoreSize: "
-              + globalMemstoreSize);
+          LOG.debug("Compaction pipeline segment " + s + " was flattened");
           return true;
         }
       }
@@ -203,9 +210,9 @@ public class CompactionPipeline {
     return minSequenceId;
   }
 
-  public long getTailSize() {
-    if (isEmpty()) return 0;
-    return pipeline.peekLast().keySize();
+  public MemstoreSize getTailSize() {
+    if (isEmpty()) return MemstoreSize.EMPTY_SIZE;
+    return new MemstoreSize(pipeline.peekLast().keySize(), pipeline.peekLast().heapOverhead());
   }
 
   private void swapSuffix(List<ImmutableSegment> suffix, ImmutableSegment segment,

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index b448b04..d4e6e12 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -104,9 +104,20 @@ public class DefaultMemStore extends AbstractMemStore {
    * @return size of data that is going to be flushed from active set
    */
   @Override
-  public long getFlushableSize() {
-    long snapshotSize = getSnapshotSize();
-    return snapshotSize > 0 ? snapshotSize : keySize();
+  public MemstoreSize getFlushableSize() {
+    MemstoreSize snapshotSize = getSnapshotSize();
+    return snapshotSize.getDataSize() > 0 ? snapshotSize
+        : new MemstoreSize(keySize(), heapOverhead());
+  }
+
+  @Override
+  protected long keySize() {
+    return this.active.keySize();
+  }
+
+  @Override
+  protected long heapOverhead() {
+    return this.active.heapOverhead();
   }
 
   @Override
@@ -144,8 +155,8 @@ public class DefaultMemStore extends AbstractMemStore {
   }
 
   @Override
-  public long size() {
-    return this.active.size() + DEEP_OVERHEAD;
+  public MemstoreSize size() {
+    return new MemstoreSize(this.active.keySize(), this.active.heapOverhead());
   }
 
   /**
@@ -179,26 +190,30 @@ public class DefaultMemStore extends AbstractMemStore {
     LOG.info("vmInputArguments=" + runtime.getInputArguments());
     DefaultMemStore memstore1 = new DefaultMemStore();
     // TODO: x32 vs x64
-    long size = 0;
     final int count = 10000;
     byte [] fam = Bytes.toBytes("col");
     byte [] qf = Bytes.toBytes("umn");
     byte [] empty = new byte[0];
+    MemstoreSize memstoreSize = new MemstoreSize();
     for (int i = 0; i < count; i++) {
       // Give each its own ts
-      size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
+      memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memstoreSize);
     }
-    LOG.info("memstore1 estimated size=" + size);
+    LOG.info("memstore1 estimated size="
+        + (memstoreSize.getDataSize() + memstoreSize.getHeapOverhead()));
     for (int i = 0; i < count; i++) {
-      size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
+      memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty), memstoreSize);
     }
-    LOG.info("memstore1 estimated size (2nd loading of same data)=" + size);
+    LOG.info("memstore1 estimated size (2nd loading of same data)="
+        + (memstoreSize.getDataSize() + memstoreSize.getHeapOverhead()));
     // Make a variably sized memstore.
     DefaultMemStore memstore2 = new DefaultMemStore();
+    memstoreSize = new MemstoreSize();
     for (int i = 0; i < count; i++) {
-      size += memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i]));
+      memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i]), memstoreSize);
     }
-    LOG.info("memstore2 estimated size=" + size);
+    LOG.info("memstore2 estimated size="
+        + (memstoreSize.getDataSize() + memstoreSize.getHeapOverhead()));
     final int seconds = 30;
     LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
     LOG.info("Exiting.");

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
index 079501e..93837b7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
@@ -89,7 +89,7 @@ public class DefaultStoreFlusher extends StoreFlusher {
       scanner.close();
     }
     LOG.info("Flushed, sequenceid=" + cacheFlushId +", memsize="
-        + StringUtils.TraditionalBinaryPrefix.long2String(snapshot.getSize(), "", 1) +
+        + StringUtils.TraditionalBinaryPrefix.long2String(snapshot.getDataSize(), "", 1) +
         ", hasBloomFilter=" + writer.hasGeneralBloom() +
         ", into tmp file " + writer.getPath());
     result.add(writer.getPath());

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
index 49cb747..119fdb5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
@@ -78,11 +78,12 @@ public abstract class FlushLargeStoresPolicy extends FlushPolicy {
   }
 
   protected boolean shouldFlush(Store store) {
-    if (store.getMemStoreSize() > this.flushSizeLowerBound) {
+    if (store.getSizeOfMemStore().getDataSize() > this.flushSizeLowerBound) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Flush Column Family " + store.getColumnFamilyName() + " of " +
             region.getRegionInfo().getEncodedName() + " because memstoreSize=" +
-            store.getMemStoreSize() + " > lower bound=" + this.flushSizeLowerBound);
+            store.getSizeOfMemStore().getDataSize() + " > lower bound="
+            + this.flushSizeLowerBound);
       }
       return true;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 99c389d..fff2e6f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -259,7 +259,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   // TODO: account for each registered handler in HeapSize computation
   private Map<String, com.google.protobuf.Service> coprocessorServiceHandlers = Maps.newHashMap();
 
-  private final AtomicLong memstoreSize = new AtomicLong(0);
+  private final AtomicLong memstoreDataSize = new AtomicLong(0);// Track data size in all memstores
   private final RegionServicesForStores regionServicesForStores = new RegionServicesForStores(this);
 
   // Debug possible data loss due to WAL off
@@ -506,23 +506,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     final FlushResult result; // indicating a failure result from prepare
     final TreeMap<byte[], StoreFlushContext> storeFlushCtxs;
     final TreeMap<byte[], List<Path>> committedFiles;
-    final TreeMap<byte[], Long> storeFlushableSize;
+    final TreeMap<byte[], MemstoreSize> storeFlushableSize;
     final long startTime;
     final long flushOpSeqId;
     final long flushedSeqId;
-    final long totalFlushableSize;
+    final MemstoreSize totalFlushableSize;
 
     /** Constructs an early exit case */
     PrepareFlushResult(FlushResult result, long flushSeqId) {
-      this(result, null, null, null, Math.max(0, flushSeqId), 0, 0, 0);
+      this(result, null, null, null, Math.max(0, flushSeqId), 0, 0, new MemstoreSize());
     }
 
     /** Constructs a successful prepare flush result */
     PrepareFlushResult(
       TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
       TreeMap<byte[], List<Path>> committedFiles,
-      TreeMap<byte[], Long> storeFlushableSize, long startTime, long flushSeqId,
-      long flushedSeqId, long totalFlushableSize) {
+      TreeMap<byte[], MemstoreSize> storeFlushableSize, long startTime, long flushSeqId,
+      long flushedSeqId, MemstoreSize totalFlushableSize) {
       this(null, storeFlushCtxs, committedFiles, storeFlushableSize, startTime,
         flushSeqId, flushedSeqId, totalFlushableSize);
     }
@@ -531,8 +531,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       FlushResult result,
       TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
       TreeMap<byte[], List<Path>> committedFiles,
-      TreeMap<byte[], Long> storeFlushableSize, long startTime, long flushSeqId,
-      long flushedSeqId, long totalFlushableSize) {
+      TreeMap<byte[], MemstoreSize> storeFlushableSize, long startTime, long flushSeqId,
+      long flushedSeqId, MemstoreSize totalFlushableSize) {
       this.result = result;
       this.storeFlushCtxs = storeFlushCtxs;
       this.committedFiles = committedFiles;
@@ -1125,19 +1125,31 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * store
    * @return the size of memstore in this region
    */
-  public long addAndGetGlobalMemstoreSize(long memStoreSize) {
+  public long addAndGetMemstoreSize(MemstoreSize memstoreSize) {
     if (this.rsAccounting != null) {
-      rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
+      rsAccounting.incGlobalMemstoreSize(memstoreSize);
     }
-    long size = this.memstoreSize.addAndGet(memStoreSize);
+    long size = this.memstoreDataSize.addAndGet(memstoreSize.getDataSize());
+    checkNegativeMemstoreDataSize(size, memstoreSize.getDataSize());
+    return size;
+  }
+
+  public void decrMemstoreSize(MemstoreSize memstoreSize) {
+    if (this.rsAccounting != null) {
+      rsAccounting.decGlobalMemstoreSize(memstoreSize);
+    }
+    long size = this.memstoreDataSize.addAndGet(-memstoreSize.getDataSize());
+    checkNegativeMemstoreDataSize(size, -memstoreSize.getDataSize());
+  }
+
+  private void checkNegativeMemstoreDataSize(long memstoreDataSize, long delta) {
     // This is extremely bad if we make memstoreSize negative. Log as much info on the offending
     // caller as possible. (memStoreSize might be a negative value already -- freeing memory)
-    if (size < 0) {
+    if (memstoreDataSize < 0) {
       LOG.error("Asked to modify this region's (" + this.toString()
-      + ") memstoreSize to a negative value which is incorrect. Current memstoreSize="
-      + (size-memStoreSize) + ", delta=" + memStoreSize, new Exception());
+          + ") memstoreSize to a negative value which is incorrect. Current memstoreSize="
+          + (memstoreDataSize - delta) + ", delta=" + delta, new Exception());
     }
-    return size;
   }
 
   @Override
@@ -1180,7 +1192,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   @Override
   public long getMemstoreSize() {
-    return memstoreSize.get();
+    return memstoreDataSize.get();
   }
 
   @Override
@@ -1490,7 +1502,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         int failedfFlushCount = 0;
         int flushCount = 0;
         long tmp = 0;
-        long remainingSize = this.memstoreSize.get();
+        long remainingSize = this.memstoreDataSize.get();
         while (remainingSize > 0) {
           try {
             internalFlushcache(status);
@@ -1499,7 +1511,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
                   " (carrying snapshot?) " + this);
             }
             flushCount++;
-            tmp = this.memstoreSize.get();
+            tmp = this.memstoreDataSize.get();
             if (tmp >= remainingSize) {
               failedfFlushCount++;
             }
@@ -1534,8 +1546,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
         // close each store in parallel
         for (final Store store : stores.values()) {
-          long flushableSize = store.getFlushableSize();
-          if (!(abort || flushableSize == 0 || writestate.readOnly)) {
+          MemstoreSize flushableSize = store.getSizeToFlush();
+          if (!(abort || flushableSize.getDataSize() == 0 || writestate.readOnly)) {
             if (getRegionServerServices() != null) {
               getRegionServerServices().abort("Assertion failed while closing store "
                 + getRegionInfo().getRegionNameAsString() + " " + store
@@ -1580,9 +1592,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
       this.closed.set(true);
       if (!canFlush) {
-        addAndGetGlobalMemstoreSize(-memstoreSize.get());
-      } else if (memstoreSize.get() != 0) {
-        LOG.error("Memstore size is " + memstoreSize.get());
+        this.decrMemstoreSize(new MemstoreSize(memstoreDataSize.get(), getMemstoreHeapOverhead()));
+      } else if (memstoreDataSize.get() != 0) {
+        LOG.error("Memstore size is " + memstoreDataSize.get());
       }
       if (coprocessorHost != null) {
         status.setStatus("Running coprocessor post-close hooks");
@@ -1605,6 +1617,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
   }
 
+  private long getMemstoreHeapOverhead() {
+    long overhead = 0;
+    for (Store s : this.stores.values()) {
+      overhead += s.getSizeOfMemStore().getHeapOverhead();
+    }
+    return overhead;
+  }
+
   @Override
   public void waitForFlushesAndCompactions() {
     synchronized (writestate) {
@@ -1670,7 +1690,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     * @return True if its worth doing a flush before we put up the close flag.
     */
   private boolean worthPreFlushing() {
-    return this.memstoreSize.get() >
+    return this.memstoreDataSize.get() >
       this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
   }
 
@@ -2246,12 +2266,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     // bulk loaded file between memory and existing hfiles. It wants a good seqeunceId that belongs
     // to no other that it can use to associate with the bulk load. Hence this little dance below
     // to go get one.
-    if (this.memstoreSize.get() <= 0) {
+    if (this.memstoreDataSize.get() <= 0) {
       // Take an update lock so no edits can come into memory just yet.
       this.updatesLock.writeLock().lock();
       WriteEntry writeEntry = null;
       try {
-        if (this.memstoreSize.get() <= 0) {
+        if (this.memstoreDataSize.get() <= 0) {
           // Presume that if there are still no edits in the memstore, then there are no edits for
           // this region out in the WAL subsystem so no need to do any trickery clearing out
           // edits in the WAL sub-system. Up the sequence number so the resulting flush id is for
@@ -2294,7 +2314,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     // block waiting for the lock for internal flush
     this.updatesLock.writeLock().lock();
     status.setStatus("Preparing flush snapshotting stores in " + getRegionInfo().getEncodedName());
-    long totalFlushableSizeOfFlushableStores = 0;
+    MemstoreSize totalSizeOfFlushableStores = new MemstoreSize();
 
     Set<byte[]> flushedFamilyNames = new HashSet<byte[]>();
     for (Store store: storesToFlush) {
@@ -2305,8 +2325,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       = new TreeMap<byte[], StoreFlushContext>(Bytes.BYTES_COMPARATOR);
     TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>(
         Bytes.BYTES_COMPARATOR);
-    TreeMap<byte[], Long> storeFlushableSize
-        = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+    TreeMap<byte[], MemstoreSize> storeFlushableSize
+        = new TreeMap<byte[], MemstoreSize>(Bytes.BYTES_COMPARATOR);
     // The sequence id of this flush operation which is used to log FlushMarker and pass to
     // createFlushContext to use as the store file's sequence id. It can be in advance of edits
     // still in the memstore, edits that are in other column families yet to be flushed.
@@ -2338,10 +2358,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
 
       for (Store s : storesToFlush) {
-        totalFlushableSizeOfFlushableStores += s.getFlushableSize();
+        MemstoreSize flushableSize = s.getSizeToFlush();
+        totalSizeOfFlushableStores.incMemstoreSize(flushableSize);
         storeFlushCtxs.put(s.getFamily().getName(), s.createFlushContext(flushOpSeqId));
         committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL
-        storeFlushableSize.put(s.getFamily().getName(), s.getFlushableSize());
+        storeFlushableSize.put(s.getFamily().getName(), flushableSize);
       }
 
       // write the snapshot start to WAL
@@ -2364,11 +2385,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       this.updatesLock.writeLock().unlock();
     }
     String s = "Finished memstore snapshotting " + this + ", syncing WAL and waiting on mvcc, " +
-        "flushsize=" + totalFlushableSizeOfFlushableStores;
+        "flushsize=" + totalSizeOfFlushableStores;
     status.setStatus(s);
     doSyncOfUnflushedWALChanges(wal, getRegionInfo());
     return new PrepareFlushResult(storeFlushCtxs, committedFiles, storeFlushableSize, startTime,
-        flushOpSeqId, flushedSeqId, totalFlushableSizeOfFlushableStores);
+        flushOpSeqId, flushedSeqId, totalSizeOfFlushableStores);
   }
 
   /**
@@ -2384,11 +2405,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       perCfExtras = new StringBuilder();
       for (Store store: storesToFlush) {
         perCfExtras.append("; ").append(store.getColumnFamilyName());
-        perCfExtras.append("=").append(StringUtils.byteDesc(store.getFlushableSize()));
+        perCfExtras.append("=")
+            .append(StringUtils.byteDesc(store.getSizeToFlush().getDataSize()));
       }
     }
     LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() +
-        " column families, memstore=" + StringUtils.byteDesc(this.memstoreSize.get()) +
+        " column families, memstore=" + StringUtils.byteDesc(this.memstoreDataSize.get()) +
         ((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") +
         ((wal != null) ? "" : "; WAL is null, using passed sequenceid=" + sequenceId));
   }
@@ -2468,7 +2490,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     long startTime = prepareResult.startTime;
     long flushOpSeqId = prepareResult.flushOpSeqId;
     long flushedSeqId = prepareResult.flushedSeqId;
-    long totalFlushableSizeOfFlushableStores = prepareResult.totalFlushableSize;
 
     String s = "Flushing stores of " + this;
     status.setStatus(s);
@@ -2504,14 +2525,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         committedFiles.put(storeName, storeCommittedFiles);
         // Flush committed no files, indicating flush is empty or flush was canceled
         if (storeCommittedFiles == null || storeCommittedFiles.isEmpty()) {
-          totalFlushableSizeOfFlushableStores -= prepareResult.storeFlushableSize.get(storeName);
+          MemstoreSize storeFlushableSize = prepareResult.storeFlushableSize.get(storeName);
+          prepareResult.totalFlushableSize.decMemstoreSize(storeFlushableSize);
         }
         flushedOutputFileSize += flush.getOutputFileSize();
       }
       storeFlushCtxs.clear();
 
       // Set down the memstore size by amount of flush.
-      this.addAndGetGlobalMemstoreSize(-totalFlushableSizeOfFlushableStores);
+      this.decrMemstoreSize(prepareResult.totalFlushableSize);
 
       if (wal != null) {
         // write flush marker to WAL. If fail, we should throw DroppedSnapshotException
@@ -2581,10 +2603,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
 
     long time = EnvironmentEdgeManager.currentTime() - startTime;
-    long memstoresize = this.memstoreSize.get();
+    long memstoresize = this.memstoreDataSize.get();
     String msg = "Finished memstore flush of ~"
-        + StringUtils.byteDesc(totalFlushableSizeOfFlushableStores) + "/"
-        + totalFlushableSizeOfFlushableStores + ", currentsize="
+        + StringUtils.byteDesc(prepareResult.totalFlushableSize.getDataSize()) + "/"
+        + prepareResult.totalFlushableSize.getDataSize() + ", currentsize="
         + StringUtils.byteDesc(memstoresize) + "/" + memstoresize
         + " for region " + this + " in " + time + "ms, sequenceid="
         + flushOpSeqId +  ", compaction requested=" + compactionRequested
@@ -2594,7 +2616,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
     if (rsServices != null && rsServices.getMetrics() != null) {
       rsServices.getMetrics().updateFlush(time - startTime,
-        totalFlushableSizeOfFlushableStores, flushedOutputFileSize);
+          prepareResult.totalFlushableSize.getDataSize(), flushedOutputFileSize);
     }
 
     return new FlushResultImpl(compactionRequested ?
@@ -3026,10 +3048,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @return Change in size brought about by applying <code>batchOp</code>
    */
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK",
-		  justification="Findbugs seems to be confused on this.")
+      justification="Findbugs seems to be confused on this.")
   @SuppressWarnings("unchecked")
   // TODO: This needs a rewrite. Doesn't have to be this long. St.Ack 20160120
-  private long doMiniBatchMutate(BatchOperation<?> batchOp) throws IOException {
+  private void doMiniBatchMutate(BatchOperation<?> batchOp) throws IOException {
     boolean replay = batchOp.isInReplay();
     // Variable to note if all Put items are for the same CF -- metrics related
     boolean putsCfSetConsistent = true;
@@ -3055,7 +3077,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     int cellCount = 0;
     /** Keep track of the locks we hold so we can release them in finally clause */
     List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
-    long addedSize = 0;
+    MemstoreSize memstoreSize = new MemstoreSize();
     try {
       // STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one.
       int numReadyToWrite = 0;
@@ -3117,7 +3139,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
       // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
       if (numReadyToWrite <= 0) {
-        return 0L;
+        return;
       }
 
       for (int i = firstIndex; !replay && i < lastIndexExclusive; i++) {
@@ -3155,7 +3177,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
           batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
         if (coprocessorHost.preBatchMutate(miniBatchOp)) {
-          return 0L;
+          return;
         } else {
           for (int i = firstIndex; i < lastIndexExclusive; i++) {
             if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
@@ -3303,7 +3325,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           this.updateSequenceId(familyMaps[i].values(),
             replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber());
         }
-        addedSize += applyFamilyMapToMemstore(familyMaps[i]);
+        applyFamilyMapToMemstore(familyMaps[i], memstoreSize);
       }
 
       // STEP 6. Complete mvcc.
@@ -3355,11 +3377,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
 
       success = true;
-      return addedSize;
     } finally {
       // Call complete rather than completeAndWait because we probably had error if walKey != null
       if (writeEntry != null) mvcc.complete(writeEntry);
-      this.addAndGetGlobalMemstoreSize(addedSize);
+      this.addAndGetMemstoreSize(memstoreSize);
       if (locked) {
         this.updatesLock.readLock().unlock();
       }
@@ -3778,7 +3799,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     // If catalog region, do not impose resource constraints or block updates.
     if (this.getRegionInfo().isMetaRegion()) return;
 
-    if (this.memstoreSize.get() > this.blockingMemStoreSize) {
+    if (this.memstoreDataSize.get() > this.blockingMemStoreSize) {
       blockedRequestsCount.increment();
       requestFlush();
       throw new RegionTooBusyException("Above memstore limit, " +
@@ -3786,7 +3807,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           this.getRegionInfo().getRegionNameAsString()) +
           ", server=" + (this.getRegionServerServices() == null ? "unknown" :
           this.getRegionServerServices().getServerName()) +
-          ", memstoreSize=" + memstoreSize.get() +
+          ", memstoreSize=" + memstoreDataSize.get() +
           ", blockingMemStoreSize=" + blockingMemStoreSize);
     }
   }
@@ -3831,57 +3852,53 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     doBatchMutate(p);
   }
 
-  /**
+  /*
    * Atomically apply the given map of family->edits to the memstore.
    * This handles the consistency control on its own, but the caller
    * should already have locked updatesLock.readLock(). This also does
    * <b>not</b> check the families for validity.
    *
    * @param familyMap Map of Cells by family
-   * @return the additional memory usage of the memstore caused by the new entries.
+   * @param memstoreSize
    */
-  private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap)
-  throws IOException {
-    long size = 0;
+  private void applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
+      MemstoreSize memstoreSize) throws IOException {
     for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
       byte[] family = e.getKey();
       List<Cell> cells = e.getValue();
       assert cells instanceof RandomAccess;
-      size += applyToMemstore(getStore(family), cells, false);
+      applyToMemstore(getStore(family), cells, false, memstoreSize);
     }
-    return size;
   }
 
-  /**
+  /*
    * @param delta If we are doing delta changes -- e.g. increment/append -- then this flag will be
    *  set; when set we will run operations that make sense in the increment/append scenario but
    *  that do not make sense otherwise.
-   * @return Memstore change in size on insert of these Cells.
    * @see #applyToMemstore(Store, Cell, long)
    */
-  private long applyToMemstore(final Store store, final List<Cell> cells, final boolean delta)
-  throws IOException {
+  private void applyToMemstore(final Store store, final List<Cell> cells, final boolean delta,
+      MemstoreSize memstoreSize) throws IOException {
     // Any change in how we update Store/MemStore needs to also be done in other applyToMemstore!!!!
     boolean upsert = delta && store.getFamily().getMaxVersions() == 1;
     if (upsert) {
-      return ((HStore) store).upsert(cells, getSmallestReadPoint());
+      ((HStore) store).upsert(cells, getSmallestReadPoint(), memstoreSize);
     } else {
-      return ((HStore) store).add(cells);
+      ((HStore) store).add(cells, memstoreSize);
     }
   }
 
-  /**
-   * @return Memstore change in size on insert of these Cells.
+  /*
    * @see #applyToMemstore(Store, List, boolean, boolean, long)
    */
-  private long applyToMemstore(final Store store, final Cell cell)
+  private void applyToMemstore(final Store store, final Cell cell, MemstoreSize memstoreSize)
   throws IOException {
     // Any change in how we update Store/MemStore needs to also be done in other applyToMemstore!!!!
     if (store == null) {
       checkFamily(CellUtil.cloneFamily(cell));
       // Unreachable because checkFamily will throw exception
     }
-    return ((HStore) store).add(cell);
+    ((HStore) store).add(cell, memstoreSize);
   }
 
   @Override
@@ -4200,6 +4217,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           }
 
           boolean flush = false;
+          MemstoreSize memstoreSize = new MemstoreSize();
           for (Cell cell: val.getCells()) {
             // Check this edit is for me. Also, guard against writing the special
             // METACOLUMN info such as HBASE::CACHEFLUSH entries
@@ -4241,12 +4259,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             }
             CellUtil.setSequenceId(cell, currentReplaySeqId);
 
-            // Once we are over the limit, restoreEdit will keep returning true to
-            // flush -- but don't flush until we've played all the kvs that make up
-            // the WALEdit.
-            flush |= restoreEdit(store, cell);
+            restoreEdit(store, cell, memstoreSize);
             editsCount++;
           }
+          if (this.rsAccounting != null) {
+            rsAccounting.addRegionReplayEditsSize(getRegionInfo().getRegionName(),
+                memstoreSize);
+          }
+          flush = isFlushSize(this.addAndGetMemstoreSize(memstoreSize));
           if (flush) {
             internalFlushcache(null, currentEditSeqId, stores.values(), status, false);
           }
@@ -4555,7 +4575,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             replayFlushInStores(flush, prepareFlushResult, true);
 
             // Set down the memstore size by amount of flush.
-            this.addAndGetGlobalMemstoreSize(-prepareFlushResult.totalFlushableSize);
+            this.decrMemstoreSize(prepareFlushResult.totalFlushableSize);
 
             this.prepareFlushResult = null;
             writestate.flushing = false;
@@ -4588,7 +4608,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             replayFlushInStores(flush, prepareFlushResult, true);
 
             // Set down the memstore size by amount of flush.
-            this.addAndGetGlobalMemstoreSize(-prepareFlushResult.totalFlushableSize);
+            this.decrMemstoreSize(prepareFlushResult.totalFlushableSize);
 
             // Inspect the memstore contents to see whether the memstore contains only edits
             // with seqId smaller than the flush seqId. If so, we can discard those edits.
@@ -4691,8 +4711,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * if the memstore edits have seqNums smaller than the given seq id
    * @throws IOException
    */
-  private long dropMemstoreContentsForSeqId(long seqId, Store store) throws IOException {
-    long totalFreedSize = 0;
+  private MemstoreSize dropMemstoreContentsForSeqId(long seqId, Store store) throws IOException {
+    MemstoreSize totalFreedSize = new MemstoreSize();
     this.updatesLock.writeLock().lock();
     try {
 
@@ -4706,10 +4726,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         // Prepare flush (take a snapshot) and then abort (drop the snapshot)
         if (store == null) {
           for (Store s : stores.values()) {
-            totalFreedSize += doDropStoreMemstoreContentsForSeqId(s, currentSeqId);
+            totalFreedSize.incMemstoreSize(doDropStoreMemstoreContentsForSeqId(s, currentSeqId));
           }
         } else {
-          totalFreedSize += doDropStoreMemstoreContentsForSeqId(store, currentSeqId);
+          totalFreedSize.incMemstoreSize(doDropStoreMemstoreContentsForSeqId(store, currentSeqId));
         }
       } else {
         LOG.info(getRegionInfo().getEncodedName() + " : "
@@ -4722,13 +4742,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     return totalFreedSize;
   }
 
-  private long doDropStoreMemstoreContentsForSeqId(Store s, long currentSeqId) throws IOException {
-    long snapshotSize = s.getFlushableSize();
-    this.addAndGetGlobalMemstoreSize(-snapshotSize);
+  private MemstoreSize doDropStoreMemstoreContentsForSeqId(Store s, long currentSeqId)
+      throws IOException {
+    MemstoreSize flushableSize = s.getSizeToFlush();
+    this.decrMemstoreSize(flushableSize);
     StoreFlushContext ctx = s.createFlushContext(currentSeqId);
     ctx.prepare();
     ctx.abort();
-    return snapshotSize;
+    return flushableSize;
   }
 
   private void replayWALFlushAbortMarker(FlushDescriptor flush) {
@@ -4841,9 +4862,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
               StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs == null ?
                   null : this.prepareFlushResult.storeFlushCtxs.get(family);
               if (ctx != null) {
-                long snapshotSize = store.getFlushableSize();
+                MemstoreSize snapshotSize = store.getSizeToFlush();
                 ctx.abort();
-                this.addAndGetGlobalMemstoreSize(-snapshotSize);
+                this.decrMemstoreSize(snapshotSize);
                 this.prepareFlushResult.storeFlushCtxs.remove(family);
               }
             }
@@ -4972,7 +4993,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           if (store == null) {
             continue;
           }
-          if (store.getSnapshotSize() > 0) {
+          if (store.getSizeOfSnapshot().getDataSize() > 0) {
             canDrop = false;
             break;
           }
@@ -5005,7 +5026,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           + "Refreshing store files to see whether we can free up memstore");
     }
 
-    long totalFreedSize = 0;
+    long totalFreedDataSize = 0;
 
     long smallestSeqIdInStores = Long.MAX_VALUE;
 
@@ -5035,11 +5056,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
                 StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs == null ?
                     null : this.prepareFlushResult.storeFlushCtxs.get(store.getFamily().getName());
                 if (ctx != null) {
-                  long snapshotSize = store.getFlushableSize();
+                  MemstoreSize snapshotSize = store.getSizeToFlush();
                   ctx.abort();
-                  this.addAndGetGlobalMemstoreSize(-snapshotSize);
+                  this.decrMemstoreSize(snapshotSize);
                   this.prepareFlushResult.storeFlushCtxs.remove(store.getFamily().getName());
-                  totalFreedSize += snapshotSize;
+                  totalFreedDataSize += snapshotSize.getDataSize();
                 }
               }
             }
@@ -5071,7 +5092,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         if (!force) {
           for (Map.Entry<Store, Long> entry : map.entrySet()) {
             // Drop the memstore contents if they are now smaller than the latest seen flushed file
-            totalFreedSize += dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey());
+            totalFreedDataSize += dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey())
+                .getDataSize();
           }
         } else {
           synchronized (storeSeqIds) {
@@ -5085,7 +5107,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       synchronized (this) {
         notifyAll(); // FindBugs NN_NAKED_NOTIFY
       }
-      return totalFreedSize > 0;
+      return totalFreedDataSize > 0;
     } finally {
       closeRegionOperation();
     }
@@ -5124,18 +5146,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       + " does not match this region: " + this.getRegionInfo());
   }
 
-  /**
+  /*
    * Used by tests
    * @param s Store to add edit too.
    * @param cell Cell to add.
-   * @return True if we should flush.
+   * @param memstoreSize
    */
-  protected boolean restoreEdit(final HStore s, final Cell cell) {
-    long kvSize = s.add(cell);
-    if (this.rsAccounting != null) {
-      rsAccounting.addAndGetRegionReplayEditsSize(getRegionInfo().getRegionName(), kvSize);
-    }
-    return isFlushSize(this.addAndGetGlobalMemstoreSize(kvSize));
+  protected void restoreEdit(final HStore s, final Cell cell, MemstoreSize memstoreSize) {
+    s.add(cell, memstoreSize);
   }
 
   /*
@@ -6986,7 +7004,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       return null;
     }
     ClientProtos.RegionLoadStats.Builder stats = ClientProtos.RegionLoadStats.newBuilder();
-    stats.setMemstoreLoad((int) (Math.min(100, (this.memstoreSize.get() * 100) / this
+    stats.setMemstoreLoad((int) (Math.min(100, (this.memstoreDataSize.get() * 100) / this
         .memstoreFlushSize)));
     stats.setHeapOccupancy((int)rsServices.getHeapMemoryManager().getHeapOccupancyPercent()*100);
     stats.setCompactionPressure((int)rsServices.getCompactionPressure()*100 > 100 ? 100 :
@@ -7035,12 +7053,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
     boolean locked;
     List<RowLock> acquiredRowLocks;
-    long addedSize = 0;
     List<Mutation> mutations = new ArrayList<Mutation>();
     Collection<byte[]> rowsToLock = processor.getRowsToLock();
     // This is assigned by mvcc either explicity in the below or in the guts of the WAL append
     // when it assigns the edit a sequencedid (A.K.A the mvcc write number).
     WriteEntry writeEntry = null;
+    MemstoreSize memstoreSize = new MemstoreSize();
     try {
       // STEP 2. Acquire the row lock(s)
       acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
@@ -7084,7 +7102,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
                 // If no WAL, need to stamp it here.
                 CellUtil.setSequenceId(cell, sequenceId);
               }
-              addedSize += applyToMemstore(getHStore(cell), cell);
+              applyToMemstore(getHStore(cell), cell, memstoreSize);
             }
           }
           // STEP 8. Complete mvcc.
@@ -7119,7 +7137,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     } finally {
       closeRegionOperation();
       if (!mutations.isEmpty()) {
-        long newSize = this.addAndGetGlobalMemstoreSize(addedSize);
+        long newSize = this.addAndGetMemstoreSize(memstoreSize);
         requestFlushIfNeeded(newSize);
       }
     }
@@ -7203,7 +7221,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   // See HBASE-16304
   @SuppressWarnings("unchecked")
   private void dropMemstoreContents() throws IOException {
-    long totalFreedSize = 0;
+    MemstoreSize totalFreedSize = new MemstoreSize();
     while (!storeSeqIds.isEmpty()) {
       Map<Store, Long> map = null;
       synchronized (storeSeqIds) {
@@ -7212,11 +7230,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
       for (Map.Entry<Store, Long> entry : map.entrySet()) {
         // Drop the memstore contents if they are now smaller than the latest seen flushed file
-        totalFreedSize += dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey());
+        totalFreedSize
+            .incMemstoreSize(dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey()));
       }
     }
-    if (totalFreedSize > 0) {
-      LOG.debug("Freed " + totalFreedSize + " bytes from memstore");
+    if (totalFreedSize.getDataSize() > 0) {
+      LOG.debug("Freed " + totalFreedSize.getDataSize() + " bytes from memstore");
     }
   }
 
@@ -7237,8 +7256,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * on the passed in <code>op</code> to do increment or append specific paths.
    */
   private Result doDelta(Operation op, Mutation mutation, long nonceGroup, long nonce,
-      boolean returnResults)
-  throws IOException {
+      boolean returnResults) throws IOException {
     checkReadOnly();
     checkResources();
     checkRow(mutation.getRow(), op.toString());
@@ -7246,9 +7264,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     this.writeRequestsCount.increment();
     WriteEntry writeEntry = null;
     startRegionOperation(op);
-    long accumulatedResultSize = 0;
     List<Cell> results = returnResults? new ArrayList<Cell>(mutation.size()): null;
     RowLock rowLock = getRowLockInternal(mutation.getRow(), false);
+    MemstoreSize memstoreSize = new MemstoreSize();
     try {
       lock(this.updatesLock.readLock());
       try {
@@ -7273,8 +7291,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           updateSequenceId(forMemStore.values(), writeEntry.getWriteNumber());
         }
         // Now write to MemStore. Do it a column family at a time.
-        for (Map.Entry<Store, List<Cell>> e: forMemStore.entrySet()) {
-          accumulatedResultSize += applyToMemstore(e.getKey(), e.getValue(), true);
+        for (Map.Entry<Store, List<Cell>> e : forMemStore.entrySet()) {
+          applyToMemstore(e.getKey(), e.getValue(), true, memstoreSize);
         }
         mvcc.completeAndWait(writeEntry);
         if (rsServices != null && rsServices.getNonceManager() != null) {
@@ -7299,7 +7317,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       if (writeEntry != null) mvcc.complete(writeEntry);
       rowLock.release();
       // Request a cache flush if over the limit.  Do it outside update lock.
-      if (isFlushSize(this.addAndGetGlobalMemstoreSize(accumulatedResultSize))) requestFlush();
+      if (isFlushSize(addAndGetMemstoreSize(memstoreSize))) {
+        requestFlush();
+      }
       closeRegionOperation(op);
       if (this.metricsRegion != null) {
         switch (op) {
@@ -8155,7 +8175,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     for (Store s : getStores()) {
       buf.append(s.getFamily().getNameAsString());
       buf.append(" size: ");
-      buf.append(s.getMemStoreSize());
+      buf.append(s.getSizeOfMemStore().getDataSize());
       buf.append(" ");
     }
     buf.append("end-of-stores");

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index d7b0d36..1dcf060 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -59,7 +59,6 @@ import org.apache.hadoop.hbase.CompoundConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Scan;
@@ -362,12 +361,26 @@ public class HStore implements Store {
   }
 
   @Override
+  @Deprecated
   public long getFlushableSize() {
+    MemstoreSize size = getSizeToFlush();
+    return size.getDataSize() + size.getHeapOverhead();
+  }
+
+  @Override
+  public MemstoreSize getSizeToFlush() {
     return this.memstore.getFlushableSize();
   }
 
   @Override
+  @Deprecated
   public long getSnapshotSize() {
+    MemstoreSize size = getSizeOfSnapshot();
+    return size.getDataSize() + size.getHeapOverhead();
+  }
+
+  @Override
+  public MemstoreSize getSizeOfSnapshot() {
     return this.memstore.getSnapshotSize();
   }
 
@@ -636,12 +649,12 @@ public class HStore implements Store {
   /**
    * Adds a value to the memstore
    * @param cell
-   * @return memstore size delta
+   * @param memstoreSize
    */
-  public long add(final Cell cell) {
+  public void add(final Cell cell, MemstoreSize memstoreSize) {
     lock.readLock().lock();
     try {
-       return this.memstore.add(cell);
+       this.memstore.add(cell, memstoreSize);
     } finally {
       lock.readLock().unlock();
     }
@@ -650,12 +663,12 @@ public class HStore implements Store {
   /**
    * Adds the specified value to the memstore
    * @param cells
-   * @return memstore size delta
+   * @param memstoreSize
    */
-  public long add(final Iterable<Cell> cells) {
+  public void add(final Iterable<Cell> cells, MemstoreSize memstoreSize) {
     lock.readLock().lock();
     try {
-      return memstore.add(cells);
+      memstore.add(cells, memstoreSize);
     } finally {
       lock.readLock().unlock();
     }
@@ -667,21 +680,6 @@ public class HStore implements Store {
   }
 
   /**
-   * Adds a value to the memstore
-   *
-   * @param kv
-   * @return memstore size delta
-   */
-  protected long delete(final KeyValue kv) {
-    lock.readLock().lock();
-    try {
-      return this.memstore.delete(kv);
-    } finally {
-      lock.readLock().unlock();
-    }
-  }
-
-  /**
    * @return All store files.
    */
   @Override
@@ -2026,7 +2024,14 @@ public class HStore implements Store {
   }
 
   @Override
+  @Deprecated
   public long getMemStoreSize() {
+    MemstoreSize size = getSizeOfMemStore();
+    return size.getDataSize() + size.getHeapOverhead();
+  }
+
+  @Override
+  public MemstoreSize getSizeOfMemStore() {
     return this.memstore.size();
   }
 
@@ -2069,37 +2074,6 @@ public class HStore implements Store {
   }
 
   /**
-   * Updates the value for the given row/family/qualifier. This function will always be seen as
-   * atomic by other readers because it only puts a single KV to memstore. Thus no read/write
-   * control necessary.
-   * @param row row to update
-   * @param f family to update
-   * @param qualifier qualifier to update
-   * @param newValue the new value to set into memstore
-   * @return memstore size delta
-   * @throws IOException
-   */
-  @VisibleForTesting
-  public long updateColumnValue(byte [] row, byte [] f,
-                                byte [] qualifier, long newValue)
-      throws IOException {
-
-    this.lock.readLock().lock();
-    try {
-      long now = EnvironmentEdgeManager.currentTime();
-
-      return this.memstore.updateColumnValue(row,
-          f,
-          qualifier,
-          newValue,
-          now);
-
-    } finally {
-      this.lock.readLock().unlock();
-    }
-  }
-
-  /**
    * Adds or replaces the specified KeyValues.
    * <p>
    * For each KeyValue specified, if a cell with the same row, family, and qualifier exists in
@@ -2109,13 +2083,14 @@ public class HStore implements Store {
    * across all of them.
    * @param cells
    * @param readpoint readpoint below which we can safely remove duplicate KVs
-   * @return memstore size delta
+   * @param memstoreSize
    * @throws IOException
    */
-  public long upsert(Iterable<Cell> cells, long readpoint) throws IOException {
+  public void upsert(Iterable<Cell> cells, long readpoint, MemstoreSize memstoreSize)
+      throws IOException {
     this.lock.readLock().lock();
     try {
-      return this.memstore.upsert(cells, readpoint);
+      this.memstore.upsert(cells, readpoint, memstoreSize);
     } finally {
       this.lock.readLock().unlock();
     }
@@ -2149,7 +2124,7 @@ public class HStore implements Store {
       // passing the current sequence number of the wal - to allow bookkeeping in the memstore
       this.snapshot = memstore.snapshot();
       this.cacheFlushCount = snapshot.getCellsCount();
-      this.cacheFlushSize = snapshot.getSize();
+      this.cacheFlushSize = snapshot.getDataSize();
       committedFiles = new ArrayList<Path>(1);
     }
 
@@ -2282,7 +2257,8 @@ public class HStore implements Store {
 
   @Override
   public long heapSize() {
-    return DEEP_OVERHEAD + this.memstore.heapSize();
+    MemstoreSize memstoreSize = this.memstore.size();
+    return DEEP_OVERHEAD + memstoreSize.getDataSize() + memstoreSize.getHeapOverhead();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
index 6226cf2..7646293 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
@@ -298,8 +298,10 @@ public class HeapMemoryManager {
       metricsHeapMemoryManager.updateUnblockedFlushCount(unblockedFlushCnt);
       tunerContext.setCurBlockCacheUsed((float) blockCache.getCurrentSize() / maxHeapSize);
       metricsHeapMemoryManager.setCurBlockCacheSizeGauge(blockCache.getCurrentSize());
-      tunerContext.setCurMemStoreUsed((float)regionServerAccounting.getGlobalMemstoreSize() / maxHeapSize);
-      metricsHeapMemoryManager.setCurMemStoreSizeGauge(regionServerAccounting.getGlobalMemstoreSize());
+      long globalMemstoreHeapSize = regionServerAccounting.getGlobalMemstoreSize()
+          + regionServerAccounting.getGlobalMemstoreHeapOverhead();
+      tunerContext.setCurMemStoreUsed((float) globalMemstoreHeapSize / maxHeapSize);
+      metricsHeapMemoryManager.setCurMemStoreSizeGauge(globalMemstoreHeapSize);
       tunerContext.setCurBlockCacheSize(blockCachePercent);
       tunerContext.setCurMemStoreSize(globalMemStorePercent);
       TunerResult result = null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/ba6d9523/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
index 8e79ad5..4cdb29d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java
@@ -21,7 +21,8 @@ package org.apache.hadoop.hbase.regionserver;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.ExtendedCell;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.client.Scan;
@@ -108,13 +109,14 @@ public class ImmutableSegment extends Segment {
     super(new CellSet(comparator), // initiailize the CellSet with empty CellSet
         comparator, memStoreLAB);
     type = Type.SKIPLIST_MAP_BASED;
+    MemstoreSize memstoreSize = new MemstoreSize();
     while (iterator.hasNext()) {
       Cell c = iterator.next();
       // The scanner is doing all the elimination logic
       // now we just copy it to the new segment
       Cell newKV = maybeCloneWithAllocator(c);
       boolean usedMSLAB = (newKV != c);
-      internalAdd(newKV, usedMSLAB); //
+      internalAdd(newKV, usedMSLAB, memstoreSize);
     }
     this.timeRange = this.timeRangeTracker == null ? null : this.timeRangeTracker.toTimeRange();
   }
@@ -140,19 +142,6 @@ public class ImmutableSegment extends Segment {
     return this.timeRange.getMin();
   }
 
-
-  @Override
-  public long size() {
-    switch (this.type) {
-    case SKIPLIST_MAP_BASED:
-      return keySize() + DEEP_OVERHEAD_CSLM;
-    case ARRAY_MAP_BASED:
-      return keySize() + DEEP_OVERHEAD_CAM;
-    default:
-      throw new RuntimeException("Unknown type " + type);
-    }
-  }
-
   /**------------------------------------------------------------------------
    * Change the CellSet of this ImmutableSegment from one based on ConcurrentSkipListMap to one
    * based on CellArrayMap.
@@ -164,7 +153,7 @@ public class ImmutableSegment extends Segment {
    * thread of compaction, but to be on the safe side the initial CellSet is locally saved
    * before the flattening and then replaced using CAS instruction.
    */
-  public boolean flatten() {
+  public boolean flatten(MemstoreSize memstoreSize) {
     if (isFlat()) return false;
     CellSet oldCellSet = getCellSet();
     int numOfCells = getCellsCount();
@@ -176,12 +165,13 @@ public class ImmutableSegment extends Segment {
 
     // arrange the meta-data size, decrease all meta-data sizes related to SkipList
     // (recreateCellArrayMapSet doesn't take the care for the sizes)
-    long newSegmentSizeDelta = -(ClassSize.CONCURRENT_SKIPLISTMAP +
-        numOfCells * ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
+    long newSegmentSizeDelta = -(numOfCells * ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY);
     // add size of CellArrayMap and meta-data overhead per Cell
-    newSegmentSizeDelta = newSegmentSizeDelta + ClassSize.CELL_ARRAY_MAP +
-        numOfCells * ClassSize.CELL_ARRAY_MAP_ENTRY;
-    incSize(newSegmentSizeDelta);
+    newSegmentSizeDelta = newSegmentSizeDelta + numOfCells * ClassSize.CELL_ARRAY_MAP_ENTRY;
+    incSize(0, newSegmentSizeDelta);
+    if (memstoreSize != null) {
+      memstoreSize.incMemstoreSize(0, newSegmentSizeDelta);
+    }
 
     return true;
   }
@@ -208,7 +198,7 @@ public class ImmutableSegment extends Segment {
       boolean useMSLAB = (getMemStoreLAB()!=null);
       // second parameter true, because in compaction addition of the cell to new segment
       // is always successful
-      updateMetaInfo(c, true, useMSLAB); // updates the size per cell
+      updateMetaInfo(c, true, useMSLAB, null); // updates the size per cell
       i++;
     }
     // build the immutable CellSet
@@ -216,14 +206,18 @@ public class ImmutableSegment extends Segment {
     return new CellSet(cam);
   }
 
-  protected long heapSizeChange(Cell cell, boolean succ) {
+  @Override
+  protected long heapOverheadChange(Cell cell, boolean succ) {
     if (succ) {
       switch (this.type) {
       case SKIPLIST_MAP_BASED:
-        return ClassSize
-            .align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + CellUtil.estimatedHeapSizeOf(cell));
+        return super.heapOverheadChange(cell, succ);
       case ARRAY_MAP_BASED:
-        return ClassSize.align(ClassSize.CELL_ARRAY_MAP_ENTRY + CellUtil.estimatedHeapSizeOf(cell));
+        if (cell instanceof ExtendedCell) {
+          return ClassSize
+              .align(ClassSize.CELL_ARRAY_MAP_ENTRY + ((ExtendedCell) cell).heapOverhead());
+        }
+        return ClassSize.align(ClassSize.CELL_ARRAY_MAP_ENTRY + KeyValue.FIXED_OVERHEAD);
       }
     }
     return 0;


Mime
View raw message