hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ramkris...@apache.org
Subject hbase git commit: HBASE-18221 Switch from pread to stream should happen under HStore's reentrant lock (Ram)
Date Fri, 23 Jun 2017 05:04:26 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-2 0357c2266 -> 49825aec6


HBASE-18221 Switch from pread to stream should happen under HStore's
reentrant lock (Ram)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/49825aec
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/49825aec
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/49825aec

Branch: refs/heads/branch-2
Commit: 49825aec6b6c6bf0d0c56fc35730efe80bafcb7b
Parents: 0357c22
Author: Ramkrishna <ramkrishna.s.vasudevan@intel.com>
Authored: Fri Jun 23 10:32:29 2017 +0530
Committer: Ramkrishna <ramkrishna.s.vasudevan@intel.com>
Committed: Fri Jun 23 10:34:10 2017 +0530

----------------------------------------------------------------------
 .../regionserver/DefaultStoreFileManager.java   |  16 +-
 .../hadoop/hbase/regionserver/HStore.java       |  46 ++++++
 .../apache/hadoop/hbase/regionserver/Store.java |  28 ++++
 .../hbase/regionserver/StoreFileManager.java    |   6 +
 .../hadoop/hbase/regionserver/StoreScanner.java |  26 ++--
 .../regionserver/StripeStoreFileManager.java    |   5 +
 .../hadoop/hbase/regionserver/TestStore.java    | 146 ++++++++++++++++---
 7 files changed, 235 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/49825aec/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
index f4f9aa6..915d62f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
@@ -27,10 +27,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
 
-import com.google.common.collect.ImmutableCollection;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -40,6 +36,10 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
 
+import com.google.common.collect.ImmutableCollection;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
 /**
  * Default implementation of StoreFileManager. Not thread-safe.
  */
@@ -117,6 +117,14 @@ class DefaultStoreFileManager implements StoreFileManager {
   }
 
   @Override
+  public final int getCompactedFilesCount() {
+    if (compactedfiles == null) {
+      return 0;
+    }
+    return compactedfiles.size();
+  }
+
+  @Override
   public void addCompactionResults(
     Collection<StoreFile> newCompactedfiles, Collection<StoreFile> results) {
     ArrayList<StoreFile> newStoreFiles = Lists.newArrayList(storefiles);

http://git-wip-us.apache.org/repos/asf/hbase/blob/49825aec/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 721299e..03c31af 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Set;
 import java.util.concurrent.Callable;
@@ -726,6 +727,11 @@ public class HStore implements Store {
     return this.storeEngine.getStoreFileManager().getStorefiles();
   }
 
+  @Override
+  public Collection<StoreFile> getCompactedFiles() {
+    return this.storeEngine.getStoreFileManager().getCompactedfiles();
+  }
+
   /**
    * This throws a WrongRegionException if the HFile does not fit in this region, or an
    * InvalidHFileException if the HFile is not valid.
@@ -1927,6 +1933,41 @@ public class HStore implements Store {
   }
 
   @Override
+  public List<KeyValueScanner> recreateScanners(List<KeyValueScanner> currentFileScanners,
+      boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
+      byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long
readPt,
+      boolean includeMemstoreScanner) throws IOException {
+    this.lock.readLock().lock();
+    try {
+      Map<String, StoreFile> name2File =
+          new HashMap<>(getStorefilesCount() + getCompactedFilesCount());
+      for (StoreFile file : getStorefiles()) {
+        name2File.put(file.getFileInfo().getActiveFileName(), file);
+      }
+      if (getCompactedFiles() != null) {
+        for (StoreFile file : getCompactedFiles()) {
+          name2File.put(file.getFileInfo().getActiveFileName(), file);
+        }
+      }
+      List<StoreFile> filesToReopen = new ArrayList<>();
+      for (KeyValueScanner kvs : currentFileScanners) {
+        assert kvs.isFileScanner();
+        if (kvs.peek() == null) {
+          continue;
+        }
+        filesToReopen.add(name2File.get(kvs.getFilePath().getName()));
+      }
+      if (filesToReopen.isEmpty()) {
+        return null;
+      }
+      return getScanners(filesToReopen, cacheBlocks, false, false, matcher, startRow,
+        includeStartRow, stopRow, includeStopRow, readPt, false);
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+
+  @Override
   public String toString() {
     return this.getColumnFamilyName();
   }
@@ -1937,6 +1978,11 @@ public class HStore implements Store {
   }
 
   @Override
+  public int getCompactedFilesCount() {
+    return this.storeEngine.getStoreFileManager().getCompactedFilesCount();
+  }
+
+  @Override
   public long getMaxStoreFileAge() {
     long earliestTS = Long.MAX_VALUE;
     for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/49825aec/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index 76595f3..766b562 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -61,6 +61,8 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
 
   Collection<StoreFile> getStorefiles();
 
+  Collection<StoreFile> getCompactedFiles();
+
   /**
    * Close all the readers We don't need to worry about subsequent requests because the Region
    * holds a write lock that will prevent any more reads or writes.
@@ -116,6 +118,27 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
       boolean includeStopRow, long readPt) throws IOException;
 
   /**
+   * Recreates the scanners on the current list of active store file scanners
+   * @param currentFileScanners the current set of active store file scanners
+   * @param cacheBlocks cache the blocks or not
+   * @param usePread use pread or not
+   * @param isCompaction is the scanner for compaction
+   * @param matcher the scan query matcher
+   * @param startRow the scan's start row
+   * @param includeStartRow should the scan include the start row
+   * @param stopRow the scan's stop row
+   * @param includeStopRow should the scan include the stop row
+   * @param readPt the read point of the current scane
+   * @param includeMemstoreScanner whether the current scanner should include memstorescanner
+   * @return list of scanners recreated on the current Scanners
+   * @throws IOException
+   */
+  List<KeyValueScanner> recreateScanners(List<KeyValueScanner> currentFileScanners,
+      boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
+      byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long
readPt,
+      boolean includeMemstoreScanner) throws IOException;
+
+  /**
    * Create scanners on the given files and if needed on the memstore with no filtering based
on TTL
    * (that happens further down the line).
    * @param files the list of files on which the scanners has to be created
@@ -367,6 +390,11 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
   int getStorefilesCount();
 
   /**
+   * @return Count of compacted store files
+   */
+  int getCompactedFilesCount();
+
+  /**
    * @return Max age of store files in this store
    */
   long getMaxStoreFileAge();

http://git-wip-us.apache.org/repos/asf/hbase/blob/49825aec/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
index 933849c..eb760ed 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
@@ -104,6 +104,12 @@ public interface StoreFileManager {
   int getStorefileCount();
 
   /**
+   * Returns the number of compacted files.
+   * @return The number of files.
+   */
+  int getCompactedFilesCount();
+
+  /**
    * Gets the store files to scan for a Scan or Get request.
    * @param startRow Start row of the request.
    * @param stopRow Stop row of the request.

http://git-wip-us.apache.org/repos/asf/hbase/blob/49825aec/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 9849c93..11301d8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -966,7 +966,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     return heap.reseek(kv);
   }
 
-  private void trySwitchToStreamRead() {
+  @VisibleForTesting
+  void trySwitchToStreamRead() {
     if (readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null
||
         bytesRead < preadMaxBytes) {
       return;
@@ -977,34 +978,27 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     }
     scanUsePread = false;
     Cell lastTop = heap.peek();
-    Map<String, StoreFile> name2File = new HashMap<>(store.getStorefilesCount());
-    for (StoreFile file : store.getStorefiles()) {
-      name2File.put(file.getFileInfo().getActiveFileName(), file);
-    }
-    List<StoreFile> filesToReopen = new ArrayList<>();
     List<KeyValueScanner> memstoreScanners = new ArrayList<>();
     List<KeyValueScanner> scannersToClose = new ArrayList<>();
     for (KeyValueScanner kvs : currentScanners) {
       if (!kvs.isFileScanner()) {
+        // collect memstorescanners here
         memstoreScanners.add(kvs);
       } else {
         scannersToClose.add(kvs);
-        if (kvs.peek() == null) {
-          continue;
-        }
-        filesToReopen.add(name2File.get(kvs.getFilePath().getName()));
       }
     }
-    if (filesToReopen.isEmpty()) {
-      return;
-    }
     List<KeyValueScanner> fileScanners = null;
     List<KeyValueScanner> newCurrentScanners;
     KeyValueHeap newHeap;
     try {
-      fileScanners =
-          store.getScanners(filesToReopen, cacheBlocks, false, false, matcher, scan.getStartRow(),
-            scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), readPt, false);
+      // recreate the scanners on the current file scanners
+      fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false,
+        matcher, scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(),
+        scan.includeStopRow(), readPt, false);
+      if (fileScanners == null) {
+        return;
+      }
       seekScanners(fileScanners, lastTop, false, parallelSeekEnabled);
       newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size());
       newCurrentScanners.addAll(fileScanners);

http://git-wip-us.apache.org/repos/asf/hbase/blob/49825aec/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
index 3c7469e..18a6eec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
@@ -147,6 +147,11 @@ public class StripeStoreFileManager
   }
 
   @Override
+  public int getCompactedFilesCount() {
+    return state.allCompactedFilesCached.size();
+  }
+
+  @Override
   public void insertNewFiles(Collection<StoreFile> sfs) throws IOException {
     CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true);
     // Passing null does not cause NPE??

http://git-wip-us.apache.org/repos/asf/hbase/blob/49825aec/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
index 22539c5..2318414 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
@@ -45,6 +46,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
 import org.apache.commons.logging.Log;
@@ -104,7 +106,6 @@ import org.junit.rules.TestName;
 import org.mockito.Mockito;
 
 import com.google.common.collect.Lists;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Test class for the Store
@@ -161,19 +162,19 @@ public class TestStore {
     init(methodName, TEST_UTIL.getConfiguration());
   }
 
-  private void init(String methodName, Configuration conf)
+  private Store init(String methodName, Configuration conf)
   throws IOException {
     HColumnDescriptor hcd = new HColumnDescriptor(family);
     // some of the tests write 4 versions and then flush
     // (with HBASE-4241, lower versions are collected on flush)
     hcd.setMaxVersions(4);
-    init(methodName, conf, hcd);
+    return init(methodName, conf, hcd);
   }
 
-  private void init(String methodName, Configuration conf,
+  private Store init(String methodName, Configuration conf,
       HColumnDescriptor hcd) throws IOException {
     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
-    init(methodName, conf, htd, hcd);
+    return init(methodName, conf, htd, hcd);
   }
 
   private Store init(String methodName, Configuration conf, HTableDescriptor htd,
@@ -184,6 +185,11 @@ public class TestStore {
   @SuppressWarnings("deprecation")
   private Store init(String methodName, Configuration conf, HTableDescriptor htd,
       HColumnDescriptor hcd, MyScannerHook hook) throws IOException {
+    return init(methodName, conf, htd, hcd, hook, false);
+  }
+  @SuppressWarnings("deprecation")
+  private Store init(String methodName, Configuration conf, HTableDescriptor htd,
+      HColumnDescriptor hcd, MyScannerHook hook, boolean switchToPread) throws IOException
{
     //Setting up a Store
     Path basedir = new Path(DIR+methodName);
     Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
@@ -198,7 +204,8 @@ public class TestStore {
     } else {
       htd.addFamily(hcd);
     }
-    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, MemStoreLABImpl.CHUNK_SIZE_DEFAULT,
1, 0, null);
+    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false,
+      MemStoreLABImpl.CHUNK_SIZE_DEFAULT, 1, 0, null);
     HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
     final Configuration walConf = new Configuration(conf);
     FSUtils.setRootDir(walConf, basedir);
@@ -208,7 +215,7 @@ public class TestStore {
     if (hook == null) {
       store = new HStore(region, hcd, conf);
     } else {
-      store = new MyStore(region, hcd, conf, hook);
+      store = new MyStore(region, hcd, conf, hook, switchToPread);
     }
     return store;
   }
@@ -833,9 +840,10 @@ public class TestStore {
 
   public static class DummyStoreEngine extends DefaultStoreEngine {
     public static DefaultCompactor lastCreatedCompactor = null;
+
     @Override
-    protected void createComponents(
-        Configuration conf, Store store, CellComparator comparator) throws IOException {
+    protected void createComponents(Configuration conf, Store store, CellComparator comparator)
+        throws IOException {
       super.createComponents(conf, store, comparator);
       lastCreatedCompactor = this.compactor;
     }
@@ -1039,6 +1047,13 @@ public class TestStore {
     return c;
   }
 
+  private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[]
value)
+      throws IOException {
+    Cell c = CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put.getCode(),
value);
+    CellUtil.setSequenceId(c, sequenceId);
+    return c;
+  }
+
   @Test
   public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException
{
     Configuration conf = HBaseConfiguration.create();
@@ -1269,35 +1284,130 @@ public class TestStore {
     storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
   }
 
-  private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook)
throws IOException {
+  private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook)
+      throws IOException {
     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
     HColumnDescriptor hcd = new HColumnDescriptor(family);
     hcd.setMaxVersions(5);
     return (MyStore) init(methodName, conf, htd, hcd, hook);
   }
 
-  private static class MyStore extends HStore {
+  class MyStore extends HStore {
     private final MyScannerHook hook;
-    MyStore(final HRegion region, final HColumnDescriptor family,
-        final Configuration confParam, MyScannerHook hook) throws IOException {
+
+    MyStore(final HRegion region, final HColumnDescriptor family, final Configuration confParam,
+        MyScannerHook hook, boolean switchToPread) throws IOException {
       super(region, family, confParam);
       this.hook = hook;
     }
 
     @Override
     public List<KeyValueScanner> getScanners(List<StoreFile> files, boolean cacheBlocks,
-      boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
-      boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
-      boolean includeMemstoreScanner) throws IOException {
+        boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
+        boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
+        boolean includeMemstoreScanner) throws IOException {
       hook.hook(this);
-      return super.getScanners(files, cacheBlocks, usePread,
-          isCompaction, matcher, startRow, true, stopRow, false, readPt, includeMemstoreScanner);
+      return super.getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow,
true,
+        stopRow, false, readPt, includeMemstoreScanner);
     }
   }
   private interface MyScannerHook {
     void hook(MyStore store) throws IOException;
   }
 
+  @Test
+  public void testSwitchingPreadtoStreamParallelyWithCompactionDischarger() throws Exception
{
+    int flushSize = 500;
+    Configuration conf = HBaseConfiguration.create();
+    conf.set("hbase.hstore.engine.class", DummyStoreEngine.class.getName());
+    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 0);
+    // Set the lower threshold to invoke the "MERGE" policy
+    HColumnDescriptor hcd = new HColumnDescriptor(family);
+    hcd.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
+    MyStore store = initMyStore(name.getMethodName(), conf, new MyScannerHook() {
+      @Override
+      public void hook(org.apache.hadoop.hbase.regionserver.TestStore.MyStore store)
+          throws IOException {
+      }
+    });
+    MemstoreSize memStoreSize = new MemstoreSize();
+    long ts = System.currentTimeMillis();
+    long seqID = 1l;
+    // Add some data to the region and do some flushes
+    for (int i = 1; i < 10; i++) {
+      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
+        memStoreSize);
+    }
+    // flush them
+    flushStore(store, seqID);
+    for (int i = 11; i < 20; i++) {
+      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
+        memStoreSize);
+    }
+    // flush them
+    flushStore(store, seqID);
+    for (int i = 21; i < 30; i++) {
+      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
+        memStoreSize);
+    }
+    // flush them
+    flushStore(store, seqID);
+
+    assertEquals(3, store.getStorefilesCount());
+    ScanInfo scanInfo = store.getScanInfo();
+    Scan scan = new Scan();
+    scan.addFamily(family);
+    Collection<StoreFile> storefiles2 = store.getStorefiles();
+    ArrayList<StoreFile> actualStorefiles = Lists.newArrayList(storefiles2);
+    StoreScanner storeScanner =
+        (StoreScanner) store.getScanner(scan, scan.getFamilyMap().get(family), Long.MAX_VALUE);
+    // get the current heap
+    KeyValueHeap heap = storeScanner.heap;
+    // create more store files
+    for (int i = 31; i < 40; i++) {
+      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
+        memStoreSize);
+    }
+    // flush them
+    flushStore(store, seqID);
+
+    for (int i = 41; i < 50; i++) {
+      store.add(createCell(Bytes.toBytes("row" + i), qf1, ts, seqID++, Bytes.toBytes("")),
+        memStoreSize);
+    }
+    // flush them
+    flushStore(store, seqID);
+    storefiles2 = store.getStorefiles();
+    ArrayList<StoreFile> actualStorefiles1 = Lists.newArrayList(storefiles2);
+    actualStorefiles1.removeAll(actualStorefiles);
+    // Do compaction
+    List<Exception> exceptions = new ArrayList<Exception>();
+    MyThread thread = new MyThread(storeScanner);
+    thread.start();
+    store.replaceStoreFiles(actualStorefiles, actualStorefiles1);
+    thread.join();
+    KeyValueHeap heap2 = thread.getHeap();
+    assertFalse(heap.equals(heap2));
+  }
+
+  private static class MyThread extends Thread {
+    private StoreScanner scanner;
+    private KeyValueHeap heap;
+
+    public MyThread(StoreScanner scanner) {
+      this.scanner = scanner;
+    }
+
+    public KeyValueHeap getHeap() {
+      return this.heap;
+    }
+
+    public void run() {
+      scanner.trySwitchToStreamRead();
+      heap = scanner.heap;
+    }
+  }
+
   private static class MyMemStoreCompactor extends MemStoreCompactor {
     private static final AtomicInteger RUNNER_COUNT = new AtomicInteger(0);
     private static final CountDownLatch START_COMPACTOR_LATCH = new CountDownLatch(1);


Mime
View raw message