hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chia7...@apache.org
Subject hbase git commit: HBASE-17887 Row-level consistency is broken for read
Date Fri, 12 May 2017 11:45:16 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 5e046151d -> b34ab5980


HBASE-17887 Row-level consistency is broken for read


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b34ab598
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b34ab598
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b34ab598

Branch: refs/heads/master
Commit: b34ab5980ea7a21fd750537476027f9a8665eacc
Parents: 5e04615
Author: Chia-Ping Tsai <chia7712@gmail.com>
Authored: Fri May 12 19:45:07 2017 +0800
Committer: Chia-Ping Tsai <chia7712@gmail.com>
Committed: Fri May 12 19:45:07 2017 +0800

----------------------------------------------------------------------
 .../regionserver/ChangedReadersObserver.java    |  12 +-
 .../hbase/regionserver/CompactingMemStore.java  |  12 +-
 .../hadoop/hbase/regionserver/HStore.java       |  26 ++-
 .../hadoop/hbase/regionserver/StoreScanner.java |  38 +++-
 .../hadoop/hbase/regionserver/TestStore.java    | 225 ++++++++++++++++++-
 .../hbase/regionserver/TestStoreScanner.java    |   5 +-
 .../hbase/regionserver/TestWideScanner.java     |   3 +-
 7 files changed, 293 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/b34ab598/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
index 0bc75e7..a019666 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChangedReadersObserver.java
@@ -21,18 +21,24 @@ package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
 import java.util.List;
-
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
-
 /**
  * If set of MapFile.Readers in Store change, implementors are notified.
  */
 @InterfaceAudience.Private
 public interface ChangedReadersObserver {
+
+  /**
+   * @return the read point of the current scan
+   */
+  long getReadPoint();
+
   /**
    * Notify observers.
+   * @param sfs The new files
+   * @param memStoreScanners scanner of current memstore
    * @throws IOException e
    */
-  void updateReaders(List<StoreFile> sfs) throws IOException;
+  void updateReaders(List<StoreFile> sfs, List<KeyValueScanner> memStoreScanners)
throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b34ab598/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index aaf60e3..8d71efc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -317,18 +317,24 @@ public class CompactingMemStore extends AbstractMemStore {
    * Scanners are ordered from 0 (oldest) to newest in increasing order.
    */
   public List<KeyValueScanner> getScanners(long readPt) throws IOException {
+    MutableSegment activeTmp = active;
     List<? extends Segment> pipelineList = pipeline.getSegments();
     List<? extends Segment> snapshotList = snapshot.getAllSegments();
     long order = 1 + pipelineList.size() + snapshotList.size();
     // The list of elements in pipeline + the active element + the snapshot segment
     // The order is the Segment ordinal
-    List<KeyValueScanner> list = new ArrayList<KeyValueScanner>((int) order);
-    order = addToScanners(active, readPt, order, list);
+    List<KeyValueScanner> list = createList((int) order);
+    order = addToScanners(activeTmp, readPt, order, list);
     order = addToScanners(pipelineList, readPt, order, list);
     addToScanners(snapshotList, readPt, order, list);
     return list;
   }
 
+   @VisibleForTesting
+   protected List<KeyValueScanner> createList(int capacity) {
+     return new ArrayList<>(capacity);
+   }
+
   /**
    * Check whether anything need to be done based on the current active set size.
    * The method is invoked upon every addition to the active set.
@@ -428,7 +434,7 @@ public class CompactingMemStore extends AbstractMemStore {
     }
   }
 
-  private void pushActiveToPipeline(MutableSegment active) {
+  protected void pushActiveToPipeline(MutableSegment active) {
     if (!active.isEmpty()) {
       pipeline.pushHead(active);
       resetActive();

http://git-wip-us.apache.org/repos/asf/hbase/blob/b34ab598/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index cbdaa1b..051471e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -105,7 +105,7 @@ import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
  */
 @InterfaceAudience.Private
 public class HStore implements Store {
-  private static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class";
+  public static final String MEMSTORE_CLASS_NAME = "hbase.regionserver.memstore.class";
   public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY =
       "hbase.server.compactchecker.interval.multiplier";
   public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
@@ -244,24 +244,27 @@ public class HStore implements Store {
     // Why not just pass a HColumnDescriptor in here altogether?  Even if have
     // to clone it?
     scanInfo = new ScanInfo(conf, family, ttl, timeToPurgeDeletes, this.comparator);
-    String className = conf.get(MEMSTORE_CLASS_NAME, DefaultMemStore.class.getName());
     MemoryCompactionPolicy inMemoryCompaction = family.getInMemoryCompaction();
     if(inMemoryCompaction == null) {
       inMemoryCompaction = MemoryCompactionPolicy.valueOf(
           conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
               CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT));
     }
+    String className;
     switch (inMemoryCompaction) {
       case BASIC :
       case EAGER :
-        className = CompactingMemStore.class.getName();
-        this.memstore = new CompactingMemStore(conf, this.comparator, this,
-            this.getHRegion().getRegionServicesForStores(), inMemoryCompaction);
+        Class<? extends CompactingMemStore> clz = conf.getClass(MEMSTORE_CLASS_NAME,
+          CompactingMemStore.class, CompactingMemStore.class);
+        className = clz.getName();
+        this.memstore = ReflectionUtils.newInstance(clz, new Object[] { conf, this.comparator,
this,
+            this.getHRegion().getRegionServicesForStores(), inMemoryCompaction});
         break;
       case NONE :
       default:
-          this.memstore = ReflectionUtils.instantiateWithCustomCtor(className, new Class[]
{
-          Configuration.class, CellComparator.class }, new Object[] { conf, this.comparator
});
+        className = DefaultMemStore.class.getName();
+        this.memstore = ReflectionUtils.instantiateWithCustomCtor(className, new Class[]
{
+        Configuration.class, CellComparator.class }, new Object[] { conf, this.comparator
});
     }
     LOG.info("Memstore class name is " + className);
     this.offPeakHours = OffPeakHours.getInstance(conf);
@@ -1149,7 +1152,14 @@ public class HStore implements Store {
    */
   private void notifyChangedReadersObservers(List<StoreFile> sfs) throws IOException
{
     for (ChangedReadersObserver o : this.changedReaderObservers) {
-      o.updateReaders(sfs);
+      List<KeyValueScanner> memStoreScanners;
+      this.lock.readLock().lock();
+      try {
+        memStoreScanners = this.memstore.getScanners(o.getReadPoint());
+      } finally {
+        this.lock.readLock().unlock();
+      }
+      o.updateReaders(sfs, memStoreScanners);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/b34ab598/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 338a68c..e9feb37 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatc
 import org.apache.hadoop.hbase.regionserver.querymatcher.LegacyScanQueryMatcher;
 import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
 import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher;
+import org.apache.hadoop.hbase.util.CollectionUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 /**
@@ -145,6 +146,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   private volatile boolean flushed = false;
   // generally we get one file from a flush
   private final List<StoreFile> flushedStoreFiles = new ArrayList<>(1);
+  // generally we get one memstroe scanner from a flush
+  private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(1);
   // The current list of scanners
   private final List<KeyValueScanner> currentScanners = new ArrayList<>();
   // flush update lock
@@ -493,6 +496,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
       this.store.deleteChangedReaderObserver(this);
     }
     if (withHeapClose) {
+      clearAndClose(memStoreScannersAfterFlush);
       for (KeyValueHeap h : this.heapsForDelayedClose) {
         h.close();
       }
@@ -814,13 +818,33 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     return true;
   }
 
+  @Override
+  public long getReadPoint() {
+    return this.readPt;
+  }
+
+  private static void clearAndClose(List<KeyValueScanner> scanners) {
+    for (KeyValueScanner s : scanners) {
+      s.close();
+    }
+    scanners.clear();
+  }
+
   // Implementation of ChangedReadersObserver
   @Override
-  public void updateReaders(List<StoreFile> sfs) throws IOException {
-    flushed = true;
+  public void updateReaders(List<StoreFile> sfs, List<KeyValueScanner> memStoreScanners)
throws IOException {
+    if (CollectionUtils.isEmpty(sfs)
+      && CollectionUtils.isEmpty(memStoreScanners)) {
+      return;
+    }
     flushLock.lock();
     try {
+      flushed = true;
       flushedStoreFiles.addAll(sfs);
+      if (!CollectionUtils.isEmpty(memStoreScanners)) {
+        clearAndClose(memStoreScannersAfterFlush);
+        memStoreScannersAfterFlush.addAll(memStoreScanners);
+      }
     } finally {
       flushLock.unlock();
     }
@@ -838,10 +862,14 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     List<KeyValueScanner> scanners;
     flushLock.lock();
     try {
-      scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get,
-        scanUsePread, false, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt,
true));
+      List<KeyValueScanner> allScanners = new ArrayList<>(flushedStoreFiles.size()
+ memStoreScannersAfterFlush.size());
+      allScanners.addAll(store.getScanners(flushedStoreFiles, cacheBlocks, get,
+        scanUsePread, false, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt,
false));
+      allScanners.addAll(memStoreScannersAfterFlush);
+      scanners = selectScannersFrom(allScanners);
       // Clear the current set of flushed store files so that they don't get added again
       flushedStoreFiles.clear();
+      memStoreScannersAfterFlush.clear();
     } finally {
       flushLock.unlock();
     }
@@ -851,7 +879,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     // remove the older memstore scanner
     for (int i = 0; i < currentScanners.size(); i++) {
       if (!currentScanners.get(i).isFileScanner()) {
-        currentScanners.remove(i);
+        currentScanners.remove(i).close();
         break;
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b34ab598/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
index 64e8397..6ea8eaa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
@@ -28,7 +28,6 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import com.google.common.collect.Lists;
-
 import java.io.IOException;
 import java.lang.ref.SoftReference;
 import java.security.PrivilegedExceptionAction;
@@ -38,9 +37,16 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableSet;
+import java.util.TreeSet;
 import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.IntStream;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -53,14 +59,17 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MemoryCompactionPolicy;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -70,6 +79,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
+import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
 import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -161,9 +171,14 @@ public class TestStore {
     init(methodName, conf, htd, hcd);
   }
 
-  @SuppressWarnings("deprecation")
   private Store init(String methodName, Configuration conf, HTableDescriptor htd,
       HColumnDescriptor hcd) throws IOException {
+    return init(methodName, conf, htd, hcd, null);
+  }
+
+  @SuppressWarnings("deprecation")
+  private Store init(String methodName, Configuration conf, HTableDescriptor htd,
+      HColumnDescriptor hcd, MyScannerHook hook) throws IOException {
     //Setting up a Store
     Path basedir = new Path(DIR+methodName);
     Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
@@ -185,8 +200,11 @@ public class TestStore {
     final WALFactory wals = new WALFactory(walConf, null, methodName);
     HRegion region = new HRegion(tableDir, wals.getWAL(info.getEncodedNameAsBytes(),
             info.getTable().getNamespace()), fs, conf, info, htd, null);
-
-    store = new HStore(region, hcd, conf);
+    if (hook == null) {
+      store = new HStore(region, hcd, conf);
+    } else {
+      store = new MyStore(region, hcd, conf, hook);
+    }
     return store;
   }
 
@@ -939,4 +957,199 @@ public class TestStore {
     //ensure that replaceStoreFiles is not called if files are not refreshed
     verify(spiedStore, times(0)).replaceStoreFiles(null, null);
   }
-}
+
+  private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value) throws
IOException {
+    Cell c = CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put.getCode(),
value);
+    CellUtil.setSequenceId(c, sequenceId);
+    return c;
+  }
+
+  @Test
+  public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException
{
+    Configuration conf = HBaseConfiguration.create();
+    conf.set(HStore.MEMSTORE_CLASS_NAME, MyCompactingMemStore.class.getName());
+    HColumnDescriptor hcd = new HColumnDescriptor(family);
+    hcd.setInMemoryCompaction(MemoryCompactionPolicy.BASIC);
+    init(name.getMethodName(), conf, hcd);
+    byte[] value = Bytes.toBytes("value");
+    MemstoreSize memStoreSize = new MemstoreSize();
+    long ts = EnvironmentEdgeManager.currentTime();
+    long seqId = 100;
+    // older data whihc shouldn't be "seen" by client
+    store.add(createCell(qf1, ts, seqId, value), memStoreSize);
+    store.add(createCell(qf2, ts, seqId, value), memStoreSize);
+    store.add(createCell(qf3, ts, seqId, value), memStoreSize);
+    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+    quals.add(qf1);
+    quals.add(qf2);
+    quals.add(qf3);
+    StoreFlushContext storeFlushCtx = store.createFlushContext(id++);
+    MyCompactingMemStore.START_TEST.set(true);
+    Runnable flush = () -> {
+      // this is blocked until we create first scanner from pipeline and snapshot -- phase
(1/5)
+      // recreate the active memstore -- phase (4/5)
+      storeFlushCtx.prepare();
+    };
+    ExecutorService service = Executors.newSingleThreadExecutor();
+    service.submit(flush);
+    // we get scanner from pipeline and snapshot but they are empty. -- phase (2/5)
+    // this is blocked until we recreate the active memstore -- phase (3/5)
+    // we get scanner from active memstore but it is empty -- phase (5/5)
+    InternalScanner scanner = (InternalScanner) store.getScanner(
+          new Scan(new Get(row)), quals, seqId + 1);
+    service.shutdown();
+    service.awaitTermination(20, TimeUnit.SECONDS);
+    try {
+      try {
+        List<Cell> results = new ArrayList<>();
+        scanner.next(results);
+        assertEquals(3, results.size());
+        for (Cell c : results) {
+          byte[] actualValue = CellUtil.cloneValue(c);
+          assertTrue("expected:" + Bytes.toStringBinary(value)
+            + ", actual:" + Bytes.toStringBinary(actualValue)
+            , Bytes.equals(actualValue, value));
+        }
+      } finally {
+        scanner.close();
+      }
+    } finally {
+      MyCompactingMemStore.START_TEST.set(false);
+      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
+      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
+    }
+  }
+
+  @Test
+  public void testScanWithDoubleFlush() throws IOException {
+    Configuration conf = HBaseConfiguration.create();
+    // Initialize region
+    MyStore myStore = initMyStore(name.getMethodName(), conf, (final MyStore store1) ->
{
+      final long tmpId = id++;
+      ExecutorService s = Executors.newSingleThreadExecutor();
+      s.submit(() -> {
+        try {
+          // flush the store before storescanner updates the scanners from store.
+          // The current data will be flushed into files, and the memstore will
+          // be clear.
+          // -- phase (4/4)
+          flushStore(store1, tmpId);
+        }catch (IOException ex) {
+          throw new RuntimeException(ex);
+        }
+      });
+      s.shutdown();
+      try {
+        // wait for the flush, the thread will be blocked in HStore#notifyChangedReadersObservers.
+        s.awaitTermination(3, TimeUnit.SECONDS);
+      } catch (InterruptedException ex) {
+      }
+    });
+    byte[] oldValue = Bytes.toBytes("oldValue");
+    byte[] currentValue = Bytes.toBytes("currentValue");
+    MemstoreSize memStoreSize = new MemstoreSize();
+    long ts = EnvironmentEdgeManager.currentTime();
+    long seqId = 100;
+    // older data whihc shouldn't be "seen" by client
+    myStore.add(createCell(qf1, ts, seqId, oldValue), memStoreSize);
+    myStore.add(createCell(qf2, ts, seqId, oldValue), memStoreSize);
+    myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSize);
+    long snapshotId = id++;
+    // push older data into snapshot -- phase (1/4)
+    StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId);
+    storeFlushCtx.prepare();
+
+    // insert current data into active -- phase (2/4)
+    myStore.add(createCell(qf1, ts + 1, seqId + 1, currentValue), memStoreSize);
+    myStore.add(createCell(qf2, ts + 1, seqId + 1, currentValue), memStoreSize);
+    myStore.add(createCell(qf3, ts + 1, seqId + 1, currentValue), memStoreSize);
+    TreeSet<byte[]> quals = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+    quals.add(qf1);
+    quals.add(qf2);
+    quals.add(qf3);
+    try (InternalScanner scanner = (InternalScanner) myStore.getScanner(
+        new Scan(new Get(row)), quals, seqId + 1)) {
+      // complete the flush -- phase (3/4)
+      storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
+      storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
+
+      List<Cell> results = new ArrayList<>();
+      scanner.next(results);
+      assertEquals(3, results.size());
+      for (Cell c : results) {
+        byte[] actualValue = CellUtil.cloneValue(c);
+        assertTrue("expected:" + Bytes.toStringBinary(currentValue)
+          + ", actual:" + Bytes.toStringBinary(actualValue)
+          , Bytes.equals(actualValue, currentValue));
+      }
+    }
+  }
+
+  private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook)
throws IOException {
+    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
+    HColumnDescriptor hcd = new HColumnDescriptor(family);
+    hcd.setMaxVersions(5);
+    return (MyStore) init(methodName, conf, htd, hcd, hook);
+  }
+
+  private static class MyStore extends HStore {
+    private final MyScannerHook hook;
+    MyStore(final HRegion region, final HColumnDescriptor family,
+        final Configuration confParam, MyScannerHook hook) throws IOException {
+      super(region, family, confParam);
+      this.hook = hook;
+    }
+
+    @Override
+    public List<KeyValueScanner> getScanners(List<StoreFile> files, boolean cacheBlocks,
+      boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
+      boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt,
+      boolean includeMemstoreScanner) throws IOException {
+      hook.hook(this);
+      return super.getScanners(files, cacheBlocks, usePread,
+          isCompaction, matcher, startRow, true, stopRow, false, readPt, includeMemstoreScanner);
+    }
+  }
+  private interface MyScannerHook {
+    void hook(MyStore store) throws IOException;
+  }
+
+  public static class MyCompactingMemStore extends CompactingMemStore {
+    private static final AtomicBoolean START_TEST = new AtomicBoolean(false);
+    private final CountDownLatch getScannerLatch = new CountDownLatch(1);
+    private final CountDownLatch snapshotLatch = new CountDownLatch(1);
+    public MyCompactingMemStore(Configuration conf, CellComparator c,
+        HStore store, RegionServicesForStores regionServices,
+        MemoryCompactionPolicy compactionPolicy) throws IOException {
+      super(conf, c, store, regionServices, compactionPolicy);
+    }
+
+    @Override
+    protected List<KeyValueScanner> createList(int capacity) {
+      if (START_TEST.get()) {
+        try {
+          getScannerLatch.countDown();
+          snapshotLatch.await();
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      }
+      return new ArrayList<>(capacity);
+    }
+    @Override
+    protected void pushActiveToPipeline(MutableSegment active) {
+      if (START_TEST.get()) {
+        try {
+          getScannerLatch.await();
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      super.pushActiveToPipeline(active);
+      if (START_TEST.get()) {
+        snapshotLatch.countDown();
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/b34ab598/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
index 524af34..040a53e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.*;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.TreeSet;
@@ -861,9 +862,9 @@ public class TestStoreScanner {
       // normally cause an NPE because scan.store is null.  So as long as we get through
these
       // two calls we are good and the bug was quashed.
 
-      scan.updateReaders(new ArrayList<>());
+      scan.updateReaders(Collections.EMPTY_LIST, Collections.EMPTY_LIST);
 
-      scan.updateReaders(new ArrayList<>());
+      scan.updateReaders(Collections.EMPTY_LIST, Collections.EMPTY_LIST);
 
       scan.peek();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b34ab598/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
index cdf84d2..9514f9c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
@@ -130,7 +131,7 @@ public class TestWideScanner extends HBaseTestCase {
           ((HRegion.RegionScannerImpl)s).storeHeap.getHeap().iterator();
         while (scanners.hasNext()) {
           StoreScanner ss = (StoreScanner)scanners.next();
-          ss.updateReaders(new ArrayList<>());
+          ss.updateReaders(Collections.EMPTY_LIST, Collections.EMPTY_LIST);
         }
       } while (more);
 


Mime
View raw message