hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From anoopsamj...@apache.org
Subject svn commit: r1577541 [2/3] - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/io/ test/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regionserver/wal/
Date Fri, 14 Mar 2014 14:07:10 GMT
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1577541&r1=1577540&r2=1577541&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Fri Mar 14 14:07:10 2014
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,168 +15,38 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.regionserver;
 
-import java.lang.management.ManagementFactory;
-import java.lang.management.RuntimeMXBean;
 import java.rmi.UnexpectedException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
-import java.util.NavigableSet;
-import java.util.SortedSet;
-import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.HeapSize;
-import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ClassSize;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 /**
- * The MemStore holds in-memory modifications to the Store.  Modifications
- * are {@link KeyValue}s.  When asked to flush, current memstore is moved
- * to snapshot and is cleared.  We continue to serve edits out of new memstore
- * and backing snapshot until flusher reports in that the flush succeeded. At
- * this point we let the snapshot go.
- *  <p>
- * The MemStore functions should not be called in parallel. Callers should hold
- *  write and read locks. This is done in {@link HStore}.
- *  </p>
- *
- * TODO: Adjust size of the memstore when we remove items because they have
- * been deleted.
- * TODO: With new KVSLS, need to make sure we update HeapSize with difference
- * in KV size.
+ * The MemStore holds in-memory modifications to the Store. Modifications are {@link Cell}s.
+ * <p>
+ * The MemStore functions should not be called in parallel. Callers should hold write and read
+ * locks. This is done in {@link HStore}.
+ * </p>
  */
 @InterfaceAudience.Private
-public class MemStore implements HeapSize {
-  private static final Log LOG = LogFactory.getLog(MemStore.class);
-
-  static final String USEMSLAB_KEY =
-    "hbase.hregion.memstore.mslab.enabled";
-  private static final boolean USEMSLAB_DEFAULT = true;
-
-  private Configuration conf;
-
-  // MemStore.  Use a KeyValueSkipListSet rather than SkipListSet because of the
-  // better semantics.  The Map will overwrite if passed a key it already had
-  // whereas the Set will not add new KV if key is same though value might be
-  // different.  Value is not important -- just make sure always same
-  // reference passed.
-  volatile KeyValueSkipListSet kvset;
-
-  // Snapshot of memstore.  Made for flusher.
-  volatile KeyValueSkipListSet snapshot;
-
-  final KeyValue.KVComparator comparator;
-
-  // Used to track own heapSize
-  final AtomicLong size;
-  private volatile long snapshotSize;
-
-  // Used to track when to flush
-  volatile long timeOfOldestEdit = Long.MAX_VALUE;
-
-  TimeRangeTracker timeRangeTracker;
-  TimeRangeTracker snapshotTimeRangeTracker;
-
-  MemStoreChunkPool chunkPool;
-  volatile MemStoreLAB allocator;
-  volatile MemStoreLAB snapshotAllocator;
-
-  /**
-   * Default constructor. Used for tests.
-   */
-  public MemStore() {
-    this(HBaseConfiguration.create(), KeyValue.COMPARATOR);
-  }
-
-  /**
-   * Constructor.
-   * @param c Comparator
-   */
-  public MemStore(final Configuration conf,
-                  final KeyValue.KVComparator c) {
-    this.conf = conf;
-    this.comparator = c;
-    this.kvset = new KeyValueSkipListSet(c);
-    this.snapshot = new KeyValueSkipListSet(c);
-    timeRangeTracker = new TimeRangeTracker();
-    snapshotTimeRangeTracker = new TimeRangeTracker();
-    this.size = new AtomicLong(DEEP_OVERHEAD);
-    this.snapshotSize = 0;
-    if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
-      this.chunkPool = MemStoreChunkPool.getPool(conf);
-      this.allocator = new MemStoreLAB(conf, chunkPool);
-    } else {
-      this.allocator = null;
-      this.chunkPool = null;
-    }
-  }
-
-  void dump() {
-    for (KeyValue kv: this.kvset) {
-      LOG.info(kv);
-    }
-    for (KeyValue kv: this.snapshot) {
-      LOG.info(kv);
-    }
-  }
+public interface MemStore extends HeapSize {
 
   /**
-   * Creates a snapshot of the current memstore.
-   * Snapshot must be cleared by call to {@link #clearSnapshot(SortedSet)}
-   * To get the snapshot made by this method, use {@link #getSnapshot()}
+   * Creates a snapshot of the current memstore. Snapshot must be cleared by call to
+   * {@link #clearSnapshot(long)}.
+   * @return {@link MemStoreSnapshot}
    */
-  void snapshot() {
-    // If snapshot currently has entries, then flusher failed or didn't call
-    // cleanup.  Log a warning.
-    if (!this.snapshot.isEmpty()) {
-      LOG.warn("Snapshot called again without clearing previous. " +
-          "Doing nothing. Another ongoing flush or did we fail last attempt?");
-    } else {
-      if (!this.kvset.isEmpty()) {
-        this.snapshotSize = keySize();
-        this.snapshot = this.kvset;
-        this.kvset = new KeyValueSkipListSet(this.comparator);
-        this.snapshotTimeRangeTracker = this.timeRangeTracker;
-        this.timeRangeTracker = new TimeRangeTracker();
-        // Reset heap to not include any keys
-        this.size.set(DEEP_OVERHEAD);
-        this.snapshotAllocator = this.allocator;
-        // Reset allocator so we get a fresh buffer for the new memstore
-        if (allocator != null) {
-          this.allocator = new MemStoreLAB(conf, chunkPool);
-        }
-        timeOfOldestEdit = Long.MAX_VALUE;
-      }
-    }
-  }
+  MemStoreSnapshot snapshot();
 
   /**
-   * Return the current snapshot.
-   * Called by flusher to get current snapshot made by a previous
-   * call to {@link #snapshot()}
-   * @return Return snapshot.
+   * Clears the current snapshot of the Memstore.
+   * @param id
    * @see #snapshot()
-   * @see #clearSnapshot(SortedSet)
    */
-  KeyValueSkipListSet getSnapshot() {
-    return this.snapshot;
-  }
+  void clearSnapshot(long id) throws UnexpectedException;
 
   /**
    * On flush, how much memory we will clear.
@@ -187,271 +56,42 @@ public class MemStore implements HeapSiz
    *
    * @return size of data that is going to be flushed
    */
-  long getFlushableSize() {
-    return this.snapshotSize > 0 ? this.snapshotSize : keySize();
-  }
-
-  /**
-   * The passed snapshot was successfully persisted; it can be let go.
-   * @param ss The snapshot to clean out.
-   * @throws UnexpectedException
-   * @see #snapshot()
-   */
-  void clearSnapshot(final SortedSet<KeyValue> ss)
-  throws UnexpectedException {
-    MemStoreLAB tmpAllocator = null;
-    if (this.snapshot != ss) {
-      throw new UnexpectedException("Current snapshot is " +
-          this.snapshot + ", was passed " + ss);
-    }
-    // OK. Passed in snapshot is same as current snapshot.  If not-empty,
-    // create a new snapshot and let the old one go.
-    if (!ss.isEmpty()) {
-      this.snapshot = new KeyValueSkipListSet(this.comparator);
-      this.snapshotTimeRangeTracker = new TimeRangeTracker();
-    }
-    this.snapshotSize = 0;
-    if (this.snapshotAllocator != null) {
-      tmpAllocator = this.snapshotAllocator;
-      this.snapshotAllocator = null;
-    }
-    if (tmpAllocator != null) {
-      tmpAllocator.close();
-    }
-  }
+  long getFlushableSize();
 
   /**
    * Write an update
-   * @param kv
+   * @param cell
    * @return approximate size of the passed key and value.
    */
-  long add(final KeyValue kv) {
-    KeyValue toAdd = maybeCloneWithAllocator(kv);
-    return internalAdd(toAdd);
-  }
-
-  long timeOfOldestEdit() {
-    return timeOfOldestEdit;
-  }
-
-  private boolean addToKVSet(KeyValue e) {
-    boolean b = this.kvset.add(e);
-    setOldestEditTimeToNow();
-    return b;
-  }
-
-  private boolean removeFromKVSet(KeyValue e) {
-    boolean b = this.kvset.remove(e);
-    setOldestEditTimeToNow();
-    return b;
-  }
-
-  void setOldestEditTimeToNow() {
-    if (timeOfOldestEdit == Long.MAX_VALUE) {
-      timeOfOldestEdit = EnvironmentEdgeManager.currentTimeMillis();
-    }
-  }
+  long add(final Cell cell);
 
   /**
-   * Internal version of add() that doesn't clone KVs with the
-   * allocator, and doesn't take the lock.
-   *
-   * Callers should ensure they already have the read lock taken
+   * @return Oldest timestamp of all the Cells in the MemStore
    */
-  private long internalAdd(final KeyValue toAdd) {
-    long s = heapSizeChange(toAdd, addToKVSet(toAdd));
-    timeRangeTracker.includeTimestamp(toAdd);
-    this.size.addAndGet(s);
-    return s;
-  }
-
-  private KeyValue maybeCloneWithAllocator(KeyValue kv) {
-    if (allocator == null) {
-      return kv;
-    }
-
-    int len = kv.getLength();
-    Allocation alloc = allocator.allocateBytes(len);
-    if (alloc == null) {
-      // The allocation was too large, allocator decided
-      // not to do anything with it.
-      return kv;
-    }
-    assert alloc.getData() != null;
-    System.arraycopy(kv.getBuffer(), kv.getOffset(), alloc.getData(), alloc.getOffset(), len);
-    KeyValue newKv = new KeyValue(alloc.getData(), alloc.getOffset(), len);
-    newKv.setMvccVersion(kv.getMvccVersion());
-    return newKv;
-  }
+  long timeOfOldestEdit();
 
   /**
-   * Remove n key from the memstore. Only kvs that have the same key and the
-   * same memstoreTS are removed.  It is ok to not update timeRangeTracker
-   * in this call. It is possible that we can optimize this method by using
-   * tailMap/iterator, but since this method is called rarely (only for
-   * error recovery), we can leave those optimization for the future.
-   * @param kv
+   * Remove n key from the memstore. Only kvs that have the same key and the same memstoreTS are
+   * removed. It is ok to not update timeRangeTracker in this call.
+   * @param cell
    */
-  void rollback(final KeyValue kv) {
-    // If the key is in the snapshot, delete it. We should not update
-    // this.size, because that tracks the size of only the memstore and
-    // not the snapshot. The flush of this snapshot to disk has not
-    // yet started because Store.flush() waits for all rwcc transactions to
-    // commit before starting the flush to disk.
-    KeyValue found = this.snapshot.get(kv);
-    if (found != null && found.getMvccVersion() == kv.getMvccVersion()) {
-      this.snapshot.remove(kv);
-    }
-    // If the key is in the memstore, delete it. Update this.size.
-    found = this.kvset.get(kv);
-    if (found != null && found.getMvccVersion() == kv.getMvccVersion()) {
-      removeFromKVSet(kv);
-      long s = heapSizeChange(kv, true);
-      this.size.addAndGet(-s);
-    }
-  }
+  void rollback(final Cell cell);
 
   /**
    * Write a delete
-   * @param delete
+   * @param deleteCell
    * @return approximate size of the passed key and value.
    */
-  long delete(final KeyValue delete) {
-    long s = 0;
-    KeyValue toAdd = maybeCloneWithAllocator(delete);
-    s += heapSizeChange(toAdd, addToKVSet(toAdd));
-    timeRangeTracker.includeTimestamp(toAdd);
-    this.size.addAndGet(s);
-    return s;
-  }
-
-  /**
-   * @param kv Find the row that comes after this one.  If null, we return the
-   * first.
-   * @return Next row or null if none found.
-   */
-  KeyValue getNextRow(final KeyValue kv) {
-    return getLowest(getNextRow(kv, this.kvset), getNextRow(kv, this.snapshot));
-  }
-
-  /*
-   * @param a
-   * @param b
-   * @return Return lowest of a or b or null if both a and b are null
-   */
-  private KeyValue getLowest(final KeyValue a, final KeyValue b) {
-    if (a == null) {
-      return b;
-    }
-    if (b == null) {
-      return a;
-    }
-    return comparator.compareRows(a, b) <= 0? a: b;
-  }
-
-  /*
-   * @param key Find row that follows this one.  If null, return first.
-   * @param map Set to look in for a row beyond <code>row</code>.
-   * @return Next row or null if none found.  If one found, will be a new
-   * KeyValue -- can be destroyed by subsequent calls to this method.
-   */
-  private KeyValue getNextRow(final KeyValue key,
-      final NavigableSet<KeyValue> set) {
-    KeyValue result = null;
-    SortedSet<KeyValue> tail = key == null? set: set.tailSet(key);
-    // Iterate until we fall into the next row; i.e. move off current row
-    for (KeyValue kv: tail) {
-      if (comparator.compareRows(kv, key) <= 0)
-        continue;
-      // Note: Not suppressing deletes or expired cells.  Needs to be handled
-      // by higher up functions.
-      result = kv;
-      break;
-    }
-    return result;
-  }
+  long delete(final Cell deleteCell);
 
   /**
+   * Find the key that matches <i>row</i> exactly, or the one that immediately precedes it. The
+   * target row key is set in state.
    * @param state column/delete tracking state
    */
-  void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) {
-    getRowKeyAtOrBefore(kvset, state);
-    getRowKeyAtOrBefore(snapshot, state);
-  }
-
-  /*
-   * @param set
-   * @param state Accumulates deletes and candidates.
-   */
-  private void getRowKeyAtOrBefore(final NavigableSet<KeyValue> set,
-      final GetClosestRowBeforeTracker state) {
-    if (set.isEmpty()) {
-      return;
-    }
-    if (!walkForwardInSingleRow(set, state.getTargetKey(), state)) {
-      // Found nothing in row.  Try backing up.
-      getRowKeyBefore(set, state);
-    }
-  }
-
-  /*
-   * Walk forward in a row from <code>firstOnRow</code>.  Presumption is that
-   * we have been passed the first possible key on a row.  As we walk forward
-   * we accumulate deletes until we hit a candidate on the row at which point
-   * we return.
-   * @param set
-   * @param firstOnRow First possible key on this row.
-   * @param state
-   * @return True if we found a candidate walking this row.
-   */
-  private boolean walkForwardInSingleRow(final SortedSet<KeyValue> set,
-      final KeyValue firstOnRow, final GetClosestRowBeforeTracker state) {
-    boolean foundCandidate = false;
-    SortedSet<KeyValue> tail = set.tailSet(firstOnRow);
-    if (tail.isEmpty()) return foundCandidate;
-    for (Iterator<KeyValue> i = tail.iterator(); i.hasNext();) {
-      KeyValue kv = i.next();
-      // Did we go beyond the target row? If so break.
-      if (state.isTooFar(kv, firstOnRow)) break;
-      if (state.isExpired(kv)) {
-        i.remove();
-        continue;
-      }
-      // If we added something, this row is a contender. break.
-      if (state.handle(kv)) {
-        foundCandidate = true;
-        break;
-      }
-    }
-    return foundCandidate;
-  }
-
-  /*
-   * Walk backwards through the passed set a row at a time until we run out of
-   * set or until we get a candidate.
-   * @param set
-   * @param state
-   */
-  private void getRowKeyBefore(NavigableSet<KeyValue> set,
-      final GetClosestRowBeforeTracker state) {
-    KeyValue firstOnRow = state.getTargetKey();
-    for (Member p = memberOfPreviousRow(set, state, firstOnRow);
-        p != null; p = memberOfPreviousRow(p.set, state, firstOnRow)) {
-      // Make sure we don't fall out of our table.
-      if (!state.isTargetTable(p.kv)) break;
-      // Stop looking if we've exited the better candidate range.
-      if (!state.isBetterCandidate(p.kv)) break;
-      // Make into firstOnRow
-      firstOnRow = new KeyValue(p.kv.getRowArray(), p.kv.getRowOffset(), p.kv.getRowLength(),
-          HConstants.LATEST_TIMESTAMP);
-      // If we find something, break;
-      if (walkForwardInSingleRow(p.set, firstOnRow, state)) break;
-    }
-  }
+  void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state);
 
   /**
-   * Only used by tests. TODO: Remove
-   *
    * Given the specs of a column, update it, first by inserting a new record,
    * then removing the old one.  Since there is only 1 KeyValue involved, the memstoreTS
    * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
@@ -464,616 +104,35 @@ public class MemStore implements HeapSiz
    * @param qualifier
    * @param newValue
    * @param now
-   * @return  Timestamp
+   * @return Timestamp
    */
-  long updateColumnValue(byte[] row,
-                                byte[] family,
-                                byte[] qualifier,
-                                long newValue,
-                                long now) {
-    KeyValue firstKv = KeyValue.createFirstOnRow(
-        row, family, qualifier);
-    // Is there a KeyValue in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
-    SortedSet<KeyValue> snSs = snapshot.tailSet(firstKv);
-    if (!snSs.isEmpty()) {
-      KeyValue snKv = snSs.first();
-      // is there a matching KV in the snapshot?
-      if (snKv.matchingRow(firstKv) && snKv.matchingQualifier(firstKv)) {
-        if (snKv.getTimestamp() == now) {
-          // poop,
-          now += 1;
-        }
-      }
-    }
-
-    // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
-    // But the timestamp should also be max(now, mostRecentTsInMemstore)
-
-    // so we cant add the new KV w/o knowing what's there already, but we also
-    // want to take this chance to delete some kvs. So two loops (sad)
-
-    SortedSet<KeyValue> ss = kvset.tailSet(firstKv);
-    for (KeyValue kv : ss) {
-      // if this isnt the row we are interested in, then bail:
-      if (!kv.matchingColumn(family, qualifier) || !kv.matchingRow(firstKv)) {
-        break; // rows dont match, bail.
-      }
-
-      // if the qualifier matches and it's a put, just RM it out of the kvset.
-      if (kv.getTypeByte() == KeyValue.Type.Put.getCode() &&
-          kv.getTimestamp() > now && firstKv.matchingQualifier(kv)) {
-        now = kv.getTimestamp();
-      }
-    }
-
-    // create or update (upsert) a new KeyValue with
-    // 'now' and a 0 memstoreTS == immediately visible
-    List<Cell> cells = new ArrayList<Cell>(1);
-    cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)));
-    return upsert(cells, 1L);
-  }
+  long updateColumnValue(byte[] row, byte[] family, byte[] qualifier, long newValue, long now);
 
   /**
-   * Update or insert the specified KeyValues.
+   * Update or insert the specified cells.
    * <p>
-   * For each KeyValue, insert into MemStore.  This will atomically upsert the
-   * value for that row/family/qualifier.  If a KeyValue did already exist,
-   * it will then be removed.
+   * For each Cell, insert into MemStore. This will atomically upsert the value for that
+   * row/family/qualifier. If a Cell did already exist, it will then be removed.
    * <p>
-   * Currently the memstoreTS is kept at 0 so as each insert happens, it will
-   * be immediately visible.  May want to change this so it is atomic across
-   * all KeyValues.
+   * Currently the memstoreTS is kept at 0 so as each insert happens, it will be immediately
+   * visible. May want to change this so it is atomic across all KeyValues.
    * <p>
-   * This is called under row lock, so Get operations will still see updates
-   * atomically.  Scans will only see each KeyValue update as atomic.
-   *
+   * This is called under row lock, so Get operations will still see updates atomically. Scans will
+   * only see each KeyValue update as atomic.
    * @param cells
-   * @param readpoint readpoint below which we can safely remove duplicate KVs 
+   * @param readpoint readpoint below which we can safely remove duplicate Cells.
    * @return change in memstore size
    */
-  public long upsert(Iterable<Cell> cells, long readpoint) {
-    long size = 0;
-    for (Cell cell : cells) {
-      size += upsert(cell, readpoint);
-    }
-    return size;
-  }
-
-  /**
-   * Inserts the specified KeyValue into MemStore and deletes any existing
-   * versions of the same row/family/qualifier as the specified KeyValue.
-   * <p>
-   * First, the specified KeyValue is inserted into the Memstore.
-   * <p>
-   * If there are any existing KeyValues in this MemStore with the same row,
-   * family, and qualifier, they are removed.
-   * <p>
-   * Callers must hold the read lock.
-   *
-   * @param cell
-   * @return change in size of MemStore
-   */
-  private long upsert(Cell cell, long readpoint) {
-    // Add the KeyValue to the MemStore
-    // Use the internalAdd method here since we (a) already have a lock
-    // and (b) cannot safely use the MSLAB here without potentially
-    // hitting OOME - see TestMemStore.testUpsertMSLAB for a
-    // test that triggers the pathological case if we don't avoid MSLAB
-    // here.
-    KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-    long addedSize = internalAdd(kv);
-
-    // Get the KeyValues for the row/family/qualifier regardless of timestamp.
-    // For this case we want to clean up any other puts
-    KeyValue firstKv = KeyValue.createFirstOnRow(
-        kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
-        kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(),
-        kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength());
-    SortedSet<KeyValue> ss = kvset.tailSet(firstKv);
-    Iterator<KeyValue> it = ss.iterator();
-    // versions visible to oldest scanner
-    int versionsVisible = 0;
-    while ( it.hasNext() ) {
-      KeyValue cur = it.next();
-
-      if (kv == cur) {
-        // ignore the one just put in
-        continue;
-      }
-      // check that this is the row and column we are interested in, otherwise bail
-      if (kv.matchingRow(cur) && kv.matchingQualifier(cur)) {
-        // only remove Puts that concurrent scanners cannot possibly see
-        if (cur.getTypeByte() == KeyValue.Type.Put.getCode() &&
-            cur.getMvccVersion() <= readpoint) {
-          if (versionsVisible > 1) {
-            // if we get here we have seen at least one version visible to the oldest scanner,
-            // which means we can prove that no scanner will see this version
-
-            // false means there was a change, so give us the size.
-            long delta = heapSizeChange(cur, true);
-            addedSize -= delta;
-            this.size.addAndGet(-delta);
-            it.remove();
-            setOldestEditTimeToNow();
-          } else {
-            versionsVisible++;
-          }
-        }
-      } else {
-        // past the row or column, done
-        break;
-      }
-    }
-    return addedSize;
-  }
-
-  /*
-   * Immutable data structure to hold member found in set and the set it was
-   * found in.  Include set because it is carrying context.
-   */
-  private static class Member {
-    final KeyValue kv;
-    final NavigableSet<KeyValue> set;
-    Member(final NavigableSet<KeyValue> s, final KeyValue kv) {
-      this.kv = kv;
-      this.set = s;
-    }
-  }
-
-  /*
-   * @param set Set to walk back in.  Pass a first in row or we'll return
-   * same row (loop).
-   * @param state Utility and context.
-   * @param firstOnRow First item on the row after the one we want to find a
-   * member in.
-   * @return Null or member of row previous to <code>firstOnRow</code>
-   */
-  private Member memberOfPreviousRow(NavigableSet<KeyValue> set,
-      final GetClosestRowBeforeTracker state, final KeyValue firstOnRow) {
-    NavigableSet<KeyValue> head = set.headSet(firstOnRow, false);
-    if (head.isEmpty()) return null;
-    for (Iterator<KeyValue> i = head.descendingIterator(); i.hasNext();) {
-      KeyValue found = i.next();
-      if (state.isExpired(found)) {
-        i.remove();
-        continue;
-      }
-      return new Member(head, found);
-    }
-    return null;
-  }
-
-  /**
-   * @return scanner on memstore and snapshot in this order.
-   */
-  List<KeyValueScanner> getScanners(long readPt) {
-    return Collections.<KeyValueScanner>singletonList(
-        new MemStoreScanner(readPt));
-  }
-
-  /**
-   * Check if this memstore may contain the required keys
-   * @param scan
-   * @return False if the key definitely does not exist in this Memstore
-   */
-  public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) {
-    return (timeRangeTracker.includesTimeRange(scan.getTimeRange()) ||
-        snapshotTimeRangeTracker.includesTimeRange(scan.getTimeRange()))
-        && (Math.max(timeRangeTracker.getMaximumTimestamp(),
-                     snapshotTimeRangeTracker.getMaximumTimestamp()) >=
-            oldestUnexpiredTS);
-  }
-
-  public TimeRangeTracker getSnapshotTimeRangeTracker() {
-    return this.snapshotTimeRangeTracker;
-  }
-
-  /*
-   * MemStoreScanner implements the KeyValueScanner.
-   * It lets the caller scan the contents of a memstore -- both current
-   * map and snapshot.
-   * This behaves as if it were a real scanner but does not maintain position.
-   */
-  protected class MemStoreScanner extends NonLazyKeyValueScanner {
-    // Next row information for either kvset or snapshot
-    private KeyValue kvsetNextRow = null;
-    private KeyValue snapshotNextRow = null;
-
-    // last iterated KVs for kvset and snapshot (to restore iterator state after reseek)
-    private KeyValue kvsetItRow = null;
-    private KeyValue snapshotItRow = null;
-    
-    // iterator based scanning.
-    private Iterator<KeyValue> kvsetIt;
-    private Iterator<KeyValue> snapshotIt;
-
-    // The kvset and snapshot at the time of creating this scanner
-    private KeyValueSkipListSet kvsetAtCreation;
-    private KeyValueSkipListSet snapshotAtCreation;
-
-    // the pre-calculated KeyValue to be returned by peek() or next()
-    private KeyValue theNext;
-
-    // The allocator and snapshot allocator at the time of creating this scanner
-    volatile MemStoreLAB allocatorAtCreation;
-    volatile MemStoreLAB snapshotAllocatorAtCreation;
-    
-    // A flag represents whether could stop skipping KeyValues for MVCC
-    // if have encountered the next row. Only used for reversed scan
-    private boolean stopSkippingKVsIfNextRow = false;
-
-    private long readPoint;
-
-    /*
-    Some notes...
-
-     So memstorescanner is fixed at creation time. this includes pointers/iterators into
-    existing kvset/snapshot.  during a snapshot creation, the kvset is null, and the
-    snapshot is moved.  since kvset is null there is no point on reseeking on both,
-      we can save us the trouble. During the snapshot->hfile transition, the memstore
-      scanner is re-created by StoreScanner#updateReaders().  StoreScanner should
-      potentially do something smarter by adjusting the existing memstore scanner.
-
-      But there is a greater problem here, that being once a scanner has progressed
-      during a snapshot scenario, we currently iterate past the kvset then 'finish' up.
-      if a scan lasts a little while, there is a chance for new entries in kvset to
-      become available but we will never see them.  This needs to be handled at the
-      StoreScanner level with coordination with MemStoreScanner.
-
-      Currently, this problem is only partly managed: during the small amount of time
-      when the StoreScanner has not yet created a new MemStoreScanner, we will miss
-      the adds to kvset in the MemStoreScanner.
-    */
-
-    MemStoreScanner(long readPoint) {
-      super();
-
-      this.readPoint = readPoint;
-      kvsetAtCreation = kvset;
-      snapshotAtCreation = snapshot;
-      if (allocator != null) {
-        this.allocatorAtCreation = allocator;
-        this.allocatorAtCreation.incScannerCount();
-      }
-      if (snapshotAllocator != null) {
-        this.snapshotAllocatorAtCreation = snapshotAllocator;
-        this.snapshotAllocatorAtCreation.incScannerCount();
-      }
-    }
-
-    private KeyValue getNext(Iterator<KeyValue> it) {
-      KeyValue startKV = theNext;
-      KeyValue v = null;
-      try {
-        while (it.hasNext()) {
-          v = it.next();
-          if (v.getMvccVersion() <= this.readPoint) {
-            return v;
-          }
-          if (stopSkippingKVsIfNextRow && startKV != null
-              && comparator.compareRows(v, startKV) > 0) {
-            return null;
-          }
-        }
-
-        return null;
-      } finally {
-        if (v != null) {
-          // in all cases, remember the last KV iterated to
-          if (it == snapshotIt) {
-            snapshotItRow = v;
-          } else {
-            kvsetItRow = v;
-          }
-        }
-      }
-    }
-
-    /**
-     *  Set the scanner at the seek key.
-     *  Must be called only once: there is no thread safety between the scanner
-     *   and the memStore.
-     * @param key seek value
-     * @return false if the key is null or if there is no data
-     */
-    @Override
-    public synchronized boolean seek(KeyValue key) {
-      if (key == null) {
-        close();
-        return false;
-      }
-
-      // kvset and snapshot will never be null.
-      // if tailSet can't find anything, SortedSet is empty (not null).
-      kvsetIt = kvsetAtCreation.tailSet(key).iterator();
-      snapshotIt = snapshotAtCreation.tailSet(key).iterator();
-      kvsetItRow = null;
-      snapshotItRow = null;
-
-      return seekInSubLists(key);
-    }
-
-
-    /**
-     * (Re)initialize the iterators after a seek or a reseek.
-     */
-    private synchronized boolean seekInSubLists(KeyValue key){
-      kvsetNextRow = getNext(kvsetIt);
-      snapshotNextRow = getNext(snapshotIt);
-
-      // Calculate the next value
-      theNext = getLowest(kvsetNextRow, snapshotNextRow);
-
-      // has data
-      return (theNext != null);
-    }
-
-
-    /**
-     * Move forward on the sub-lists set previously by seek.
-     * @param key seek value (should be non-null)
-     * @return true if there is at least one KV to read, false otherwise
-     */
-    @Override
-    public synchronized boolean reseek(KeyValue key) {
-      /*
-      See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
-      This code is executed concurrently with flush and puts, without locks.
-      Two points must be known when working on this code:
-      1) It's not possible to use the 'kvTail' and 'snapshot'
-       variables, as they are modified during a flush.
-      2) The ideal implementation for performance would use the sub skip list
-       implicitly pointed by the iterators 'kvsetIt' and
-       'snapshotIt'. Unfortunately the Java API does not offer a method to
-       get it. So we remember the last keys we iterated to and restore
-       the reseeked set to at least that point.
-       */
-
-      kvsetIt = kvsetAtCreation.tailSet(getHighest(key, kvsetItRow)).iterator();
-      snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator();
-
-      return seekInSubLists(key);
-    }
-
-
-    @Override
-    public synchronized KeyValue peek() {
-      //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
-      return theNext;
-    }
-
-    @Override
-    public synchronized KeyValue next() {
-      if (theNext == null) {
-          return null;
-      }
-
-      final KeyValue ret = theNext;
-
-      // Advance one of the iterators
-      if (theNext == kvsetNextRow) {
-        kvsetNextRow = getNext(kvsetIt);
-      } else {
-        snapshotNextRow = getNext(snapshotIt);
-      }
-
-      // Calculate the next value
-      theNext = getLowest(kvsetNextRow, snapshotNextRow);
-
-      //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint();
-      //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " +
-      //    getLowest() + " threadpoint=" + readpoint);
-      return ret;
-    }
-
-    /*
-     * Returns the lower of the two key values, or null if they are both null.
-     * This uses comparator.compare() to compare the KeyValue using the memstore
-     * comparator.
-     */
-    private KeyValue getLowest(KeyValue first, KeyValue second) {
-      if (first == null && second == null) {
-        return null;
-      }
-      if (first != null && second != null) {
-        int compare = comparator.compare(first, second);
-        return (compare <= 0 ? first : second);
-      }
-      return (first != null ? first : second);
-    }
-
-    /*
-     * Returns the higher of the two key values, or null if they are both null.
-     * This uses comparator.compare() to compare the KeyValue using the memstore
-     * comparator.
-     */
-    private KeyValue getHighest(KeyValue first, KeyValue second) {
-      if (first == null && second == null) {
-        return null;
-      }
-      if (first != null && second != null) {
-        int compare = comparator.compare(first, second);
-        return (compare > 0 ? first : second);
-      }
-      return (first != null ? first : second);
-    }
-
-    public synchronized void close() {
-      this.kvsetNextRow = null;
-      this.snapshotNextRow = null;
-
-      this.kvsetIt = null;
-      this.snapshotIt = null;
-      
-      if (allocatorAtCreation != null) {
-        this.allocatorAtCreation.decScannerCount();
-        this.allocatorAtCreation = null;
-      }
-      if (snapshotAllocatorAtCreation != null) {
-        this.snapshotAllocatorAtCreation.decScannerCount();
-        this.snapshotAllocatorAtCreation = null;
-      }
-
-      this.kvsetItRow = null;
-      this.snapshotItRow = null;
-    }
-
-    /**
-     * MemStoreScanner returns max value as sequence id because it will
-     * always have the latest data among all files.
-     */
-    @Override
-    public long getSequenceID() {
-      return Long.MAX_VALUE;
-    }
-
-    @Override
-    public boolean shouldUseScanner(Scan scan, SortedSet<byte[]> columns,
-        long oldestUnexpiredTS) {
-      return shouldSeek(scan, oldestUnexpiredTS);
-    }
-
-    /**
-     * Seek scanner to the given key first. If it returns false(means
-     * peek()==null) or scanner's peek row is bigger than row of given key, seek
-     * the scanner to the previous row of given key
-     */
-    @Override
-    public synchronized boolean backwardSeek(KeyValue key) {
-      seek(key);
-      if (peek() == null || comparator.compareRows(peek(), key) > 0) {
-        return seekToPreviousRow(key);
-      }
-      return true;
-    }
-
-    /**
-     * Separately get the KeyValue before the specified key from kvset and
-     * snapshotset, and use the row of higher one as the previous row of
-     * specified key, then seek to the first KeyValue of previous row
-     */
-    @Override
-    public synchronized boolean seekToPreviousRow(KeyValue key) {
-      KeyValue firstKeyOnRow = KeyValue.createFirstOnRow(key.getRow());
-      SortedSet<KeyValue> kvHead = kvsetAtCreation.headSet(firstKeyOnRow);
-      KeyValue kvsetBeforeRow = kvHead.isEmpty() ? null : kvHead.last();
-      SortedSet<KeyValue> snapshotHead = snapshotAtCreation
-          .headSet(firstKeyOnRow);
-      KeyValue snapshotBeforeRow = snapshotHead.isEmpty() ? null : snapshotHead
-          .last();
-      KeyValue lastKVBeforeRow = getHighest(kvsetBeforeRow, snapshotBeforeRow);
-      if (lastKVBeforeRow == null) {
-        theNext = null;
-        return false;
-      }
-      KeyValue firstKeyOnPreviousRow = KeyValue
-          .createFirstOnRow(lastKVBeforeRow.getRow());
-      this.stopSkippingKVsIfNextRow = true;
-      seek(firstKeyOnPreviousRow);
-      this.stopSkippingKVsIfNextRow = false;
-      if (peek() == null
-          || comparator.compareRows(peek(), firstKeyOnPreviousRow) > 0) {
-        return seekToPreviousRow(lastKVBeforeRow);
-      }
-      return true;
-    }
-
-    @Override
-    public synchronized boolean seekToLastRow() {
-      KeyValue first = kvsetAtCreation.isEmpty() ? null : kvsetAtCreation
-          .last();
-      KeyValue second = snapshotAtCreation.isEmpty() ? null
-          : snapshotAtCreation.last();
-      KeyValue higherKv = getHighest(first, second);
-      if (higherKv == null) {
-        return false;
-      }
-      KeyValue firstKvOnLastRow = KeyValue.createFirstOnRow(higherKv.getRow());
-      if (seek(firstKvOnLastRow)) {
-        return true;
-      } else {
-        return seekToPreviousRow(higherKv);
-      }
-
-    }
-  }
-
-  public final static long FIXED_OVERHEAD = ClassSize.align(
-      ClassSize.OBJECT + (10 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG));
-
-  public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
-      ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) +
-      (2 * ClassSize.KEYVALUE_SKIPLIST_SET) + (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
-
-  /*
-   * Calculate how the MemStore size has changed.  Includes overhead of the
-   * backing Map.
-   * @param kv
-   * @param notpresent True if the kv was NOT present in the set.
-   * @return Size
-   */
-  static long heapSizeChange(final KeyValue kv, final boolean notpresent) {
-    return notpresent ?
-        ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()):
-        0;
-  }
-
-  /**
-   * Get the entire heap usage for this MemStore not including keys in the
-   * snapshot.
-   */
-  @Override
-  public long heapSize() {
-    return size.get();
-  }
+  long upsert(Iterable<Cell> cells, long readpoint);
 
   /**
-   * Get the heap usage of KVs in this MemStore.
+   * @return scanner over the memstore. This might include scanner over the snapshot when one is
+   * present.
    */
-  public long keySize() {
-    return heapSize() - DEEP_OVERHEAD;
-  }
+  List<KeyValueScanner> getScanners(long readPt);
 
   /**
-   * Code to help figure if our approximation of object heap sizes is close
-   * enough.  See hbase-900.  Fills memstores then waits so user can heap
-   * dump and bring up resultant hprof in something like jprofiler which
-   * allows you get 'deep size' on objects.
-   * @param args main args
+   * @return Total memory occupied by this MemStore.
    */
-  public static void main(String [] args) {
-    RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
-    LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" +
-      runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion());
-    LOG.info("vmInputArguments=" + runtime.getInputArguments());
-    MemStore memstore1 = new MemStore();
-    // TODO: x32 vs x64
-    long size = 0;
-    final int count = 10000;
-    byte [] fam = Bytes.toBytes("col");
-    byte [] qf = Bytes.toBytes("umn");
-    byte [] empty = new byte[0];
-    for (int i = 0; i < count; i++) {
-      // Give each its own ts
-      size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
-    }
-    LOG.info("memstore1 estimated size=" + size);
-    for (int i = 0; i < count; i++) {
-      size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
-    }
-    LOG.info("memstore1 estimated size (2nd loading of same data)=" + size);
-    // Make a variably sized memstore.
-    MemStore memstore2 = new MemStore();
-    for (int i = 0; i < count; i++) {
-      size += memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i,
-        new byte[i]));
-    }
-    LOG.info("memstore2 estimated size=" + size);
-    final int seconds = 30;
-    LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
-    for (int i = 0; i < seconds; i++) {
-      // Thread.sleep(1000);
-    }
-    LOG.info("Exiting.");
-  }
+  long size();
 }

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java?rev=1577541&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java Fri Mar 14 14:07:10 2014
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Holds details of the snapshot taken on a MemStore. Details include the snapshot's identifier,
+ * count of cells in it and total memory size occupied by all the cells, timestamp information of
+ * all the cells and a scanner to read all cells in it.
+ */
+@InterfaceAudience.Private
+public class MemStoreSnapshot {
+
+  private final long id;
+  private final int cellsCount;
+  private final long size;
+  private final TimeRangeTracker timeRangeTracker;
+  private final KeyValueScanner scanner;
+
+  public MemStoreSnapshot(long id, int cellsCount, long size, TimeRangeTracker timeRangeTracker,
+      KeyValueScanner scanner) {
+    this.id = id;
+    this.cellsCount = cellsCount;
+    this.size = size;
+    this.timeRangeTracker = timeRangeTracker;
+    this.scanner = scanner;
+  }
+
+  /**
+   * @return snapshot's identifier.
+   */
+  public long getId() {
+    return id;
+  }
+
+  /**
+   * @return Number of Cells in this snapshot.
+   */
+  public int getCellsCount() {
+    return cellsCount;
+  }
+
+  /**
+   * @return Total memory size occupied by this snapshot.
+   */
+  public long getSize() {
+    return size;
+  }
+
+  /**
+   * @return {@link TimeRangeTracker} for all the Cells in the snapshot.
+   */
+  public TimeRangeTracker getTimeRangeTracker() {
+    return this.timeRangeTracker;
+  }
+
+  /**
+   * @return {@link KeyValueScanner} for iterating over the snapshot
+   */
+  public KeyValueScanner getScanner() {
+    return this.scanner;
+  }
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java?rev=1577541&r1=1577540&r2=1577541&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java Fri Mar 14 14:07:10 2014
@@ -22,11 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.SortedSet;
-import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -37,7 +33,6 @@ import org.apache.hadoop.hbase.KeyValueU
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
-import org.apache.hadoop.hbase.util.CollectionBackedScanner;
 
 /**
  * Store flusher interface. Turns a snapshot of memstore into a set of store files (usually one).
@@ -57,15 +52,11 @@ abstract class StoreFlusher {
    * Turns a snapshot of memstore into a set of store files.
    * @param snapshot Memstore snapshot.
    * @param cacheFlushSeqNum Log cache flush sequence number.
-   * @param snapshotTimeRangeTracker Time range tracker from the memstore
-   *                                 pertaining to the snapshot.
-   * @param flushedSize Out parameter for the size of the KVs flushed.
    * @param status Task that represents the flush operation and may be updated with status.
    * @return List of files written. Can be empty; must not be null.
    */
-  public abstract List<Path> flushSnapshot(SortedSet<KeyValue> snapshot, long cacheFlushSeqNum,
-      TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize, MonitoredTask status)
-      throws IOException;
+  public abstract List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
+      MonitoredTask status) throws IOException;
 
   protected void finalizeWriter(StoreFile.Writer writer, long cacheFlushSeqNum,
       MonitoredTask status) throws IOException {
@@ -81,21 +72,21 @@ abstract class StoreFlusher {
 
   /**
    * Creates the scanner for flushing snapshot. Also calls coprocessors.
+   * @param snapshotScanner
+   * @param smallestReadPoint
    * @return The scanner; null if coprocessor is canceling the flush.
    */
-  protected InternalScanner createScanner(SortedSet<KeyValue> snapshot,
+  protected InternalScanner createScanner(KeyValueScanner snapshotScanner,
       long smallestReadPoint) throws IOException {
-    KeyValueScanner memstoreScanner =
-        new CollectionBackedScanner(snapshot, store.getComparator());
     InternalScanner scanner = null;
     if (store.getCoprocessorHost() != null) {
-      scanner = store.getCoprocessorHost().preFlushScannerOpen(store, memstoreScanner);
+      scanner = store.getCoprocessorHost().preFlushScannerOpen(store, snapshotScanner);
     }
     if (scanner == null) {
       Scan scan = new Scan();
       scan.setMaxVersions(store.getScanInfo().getMaxVersions());
       scanner = new StoreScanner(store, store.getScanInfo(), scan,
-          Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
+          Collections.singletonList(snapshotScanner), ScanType.COMPACT_RETAIN_DELETES,
           smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
     }
     assert scanner != null;
@@ -115,15 +106,13 @@ abstract class StoreFlusher {
    * @param scanner Scanner to get data from.
    * @param sink Sink to write data to. Could be StoreFile.Writer.
    * @param smallestReadPoint Smallest read point used for the flush.
-   * @return Bytes flushed.
    */
-  protected long performFlush(InternalScanner scanner,
+  protected void performFlush(InternalScanner scanner,
       Compactor.CellSink sink, long smallestReadPoint) throws IOException {
     int compactionKVMax =
       conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
     List<Cell> kvs = new ArrayList<Cell>();
     boolean hasMore;
-    long flushed = 0;
     do {
       hasMore = scanner.next(kvs, compactionKVMax);
       if (!kvs.isEmpty()) {
@@ -139,11 +128,9 @@ abstract class StoreFlusher {
             kv.setMvccVersion(0);
           }
           sink.append(kv);
-          flushed += MemStore.heapSizeChange(kv, true);
         }
         kvs.clear();
       }
     } while (hasMore);
-    return flushed;
   }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java?rev=1577541&r1=1577540&r2=1577541&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java Fri Mar 14 14:07:10 2014
@@ -21,21 +21,16 @@ package org.apache.hadoop.hbase.regionse
 import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.SortedSet;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
 import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
 import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
-import org.apache.hadoop.hbase.util.CollectionBackedScanner;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -57,33 +52,32 @@ public class StripeStoreFlusher extends 
   }
 
   @Override
-  public List<Path> flushSnapshot(SortedSet<KeyValue> snapshot, long cacheFlushSeqNum,
-      final TimeRangeTracker tracker, AtomicLong flushedSize, MonitoredTask status)
-          throws IOException {
+  public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
+      MonitoredTask status) throws IOException {
     List<Path> result = null;
-    int kvCount = snapshot.size();
-    if (kvCount == 0) return result; // don't flush if there are no entries
+    int cellsCount = snapshot.getCellsCount();
+    if (cellsCount == 0) return result; // don't flush if there are no entries
 
     long smallestReadPoint = store.getSmallestReadPoint();
-    InternalScanner scanner = createScanner(snapshot, smallestReadPoint);
+    InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint);
     if (scanner == null) {
       return result; // NULL scanner returned from coprocessor hooks means skip normal processing
     }
 
     // Let policy select flush method.
-    StripeFlushRequest req = this.policy.selectFlush(this.stripes, kvCount);
+    StripeFlushRequest req = this.policy.selectFlush(this.stripes, cellsCount);
 
-    long flushedBytes = 0;
     boolean success = false;
     StripeMultiFileWriter mw = null;
     try {
       mw = req.createWriter(); // Writer according to the policy.
-      StripeMultiFileWriter.WriterFactory factory = createWriterFactory(tracker, kvCount);
+      StripeMultiFileWriter.WriterFactory factory = createWriterFactory(
+          snapshot.getTimeRangeTracker(), cellsCount);
       StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
       mw.init(storeScanner, factory, store.getComparator());
 
       synchronized (flushLock) {
-        flushedBytes = performFlush(scanner, mw, smallestReadPoint);
+        performFlush(scanner, mw, smallestReadPoint);
         result = mw.commitWriters(cacheFlushSeqNum, false);
         success = true;
       }
@@ -100,7 +94,6 @@ public class StripeStoreFlusher extends 
           }
         }
       }
-      flushedSize.set(flushedBytes);
       try {
         scanner.close();
       } catch (IOException ex) {

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java?rev=1577541&r1=1577540&r2=1577541&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java Fri Mar 14 14:07:10 2014
@@ -44,10 +44,10 @@ import org.apache.hadoop.hbase.client.Pu
 import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
 import org.apache.hadoop.hbase.io.hfile.CachedBlock;
 import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
+import org.apache.hadoop.hbase.regionserver.DefaultMemStore;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.KeyValueSkipListSet;
-import org.apache.hadoop.hbase.regionserver.MemStore;
 import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.junit.BeforeClass;
@@ -291,17 +291,17 @@ public class TestHeapSize  {
       assertEquals(expected, actual);
     }
 
-    // MemStore Overhead
-    cl = MemStore.class;
-    actual = MemStore.FIXED_OVERHEAD;
+    // DefaultMemStore Overhead
+    cl = DefaultMemStore.class;
+    actual = DefaultMemStore.FIXED_OVERHEAD;
     expected = ClassSize.estimateBase(cl, false);
     if(expected != actual) {
       ClassSize.estimateBase(cl, true);
       assertEquals(expected, actual);
     }
 
-    // MemStore Deep Overhead
-    actual = MemStore.DEEP_OVERHEAD;
+    // DefaultMemStore Deep Overhead
+    actual = DefaultMemStore.DEEP_OVERHEAD;
     expected = ClassSize.estimateBase(cl, false);
     expected += ClassSize.estimateBase(AtomicLong.class, false);
     expected += (2 * ClassSize.estimateBase(KeyValueSkipListSet.class, false));



Mime
View raw message