hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chia7...@apache.org
Subject hbase git commit: HBASE-18295 The result contains the cells across different rows
Date Tue, 11 Jul 2017 18:46:34 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 44651e52d -> bec34ae43


HBASE-18295  The result contains the cells across different rows


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/bec34ae4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/bec34ae4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/bec34ae4

Branch: refs/heads/branch-1
Commit: bec34ae432920b37821a313d86a37a0c960aa9dd
Parents: 44651e5
Author: Chia-Ping Tsai <chia7712@gmail.com>
Authored: Wed Jul 12 02:46:13 2017 +0800
Committer: Chia-Ping Tsai <chia7712@gmail.com>
Committed: Wed Jul 12 02:46:13 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/StoreScanner.java |  37 +++-
 .../hadoop/hbase/regionserver/TestStore.java    | 179 +++++++++++++++++--
 2 files changed, 204 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/bec34ae4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index d3a1e49..7c34b87 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -142,6 +142,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   private ReentrantLock flushLock = new ReentrantLock();
 
   private final long readPt;
+  private boolean topChanged = false;
 
   // used by the injection framework to test race between StoreScanner construction and compaction
   enum StoreScannerCompactionRace {
@@ -547,7 +548,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
       if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same
heap.
       checkScanOrder(prevCell, cell, comparator);
       prevCell = cell;
-
+      topChanged = false;
       ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
       switch (qcode) {
         case INCLUDE:
@@ -644,10 +645,18 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
           // another compareRow to say the current row is DONE
           matcher.clearCurrentRow();
           seekOrSkipToNextRow(cell);
+          NextState stateAfterSeekNextRow = needToReturn(outResult);
+          if (stateAfterSeekNextRow != null) {
+            return scannerContext.setScannerState(stateAfterSeekNextRow).hasMoreValues();
+          }
           break;
 
         case SEEK_NEXT_COL:
           seekOrSkipToNextColumn(cell);
+          NextState stateAfterSeekNextColumn = needToReturn(outResult);
+          if (stateAfterSeekNextColumn != null) {
+            return scannerContext.setScannerState(stateAfterSeekNextColumn).hasMoreValues();
+          }
           break;
 
         case SKIP:
@@ -658,6 +667,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
           Cell nextKV = matcher.getNextKeyHint(cell);
           if (nextKV != null) {
             seekAsDirection(nextKV);
+            NextState stateAfterSeekByHint = needToReturn(outResult);
+            if (stateAfterSeekByHint != null) {
+              return scannerContext.setScannerState(stateAfterSeekByHint).hasMoreValues();
+            }
           } else {
             heap.next();
           }
@@ -677,6 +690,24 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
   }
 
+  /**
+   * If the top cell won't be flushed into disk, the new top cell may be
+   * changed after #reopenAfterFlush. Because the older top cell only exist
+   * in the memstore scanner but the memstore scanner is replaced by hfile
+   * scanner after #reopenAfterFlush. If the row of top cell is changed,
+   * we should return the current cells. Otherwise, we may return
+   * the cells across different rows.
+   * @param outResult the cells which are visible for user scan
+   * @return null is the top cell doesn't change. Otherwise, the NextState
+   *         to return
+   */
+  private NextState needToReturn(List<Cell> outResult) {
+    if (!outResult.isEmpty() && topChanged) {
+      return heap.peek() == null ? NextState.NO_MORE_VALUES : NextState.MORE_VALUES;
+    }
+    return null;
+  }
+
   private void seekOrSkipToNextRow(Cell cell) throws IOException {
     // If it is a Get Scan, then we know that we are done with this row; there are no more
     // rows beyond the current one: don't try to optimize.
@@ -846,14 +877,16 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
       resetScannerStack(this.lastTop);
       if (this.heap.peek() == null
           || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
-        LOG.debug("Storescanner.peek() is changed where before = "
+        LOG.info("Storescanner.peek() is changed where before = "
             + this.lastTop.toString() + ",and after = " + this.heap.peek());
         this.lastTop = null;
+        topChanged = true;
         return true;
       }
       this.lastTop = null; // gone!
     }
     // else dont need to reseek
+    topChanged = false;
     return false;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/bec34ae4/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
index 573de11..efc9bbd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
@@ -68,6 +68,8 @@ import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -93,6 +95,8 @@ import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.util.Progressable;
 import org.junit.After;
 import org.junit.Assert;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -178,7 +182,7 @@ public class TestStore {
 
   @SuppressWarnings("deprecation")
   private Store init(String methodName, Configuration conf, HTableDescriptor htd,
-      HColumnDescriptor hcd, MyScannerHook hook) throws IOException {
+      HColumnDescriptor hcd, MyStoreHook hook) throws IOException {
     //Setting up a Store
     Path basedir = new Path(DIR+methodName);
     Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName());
@@ -1154,6 +1158,10 @@ public class TestStore {
   }
 
   private Cell createCell(byte[] qualifier, long ts, long sequenceId, byte[] value) throws
IOException {
+    return createCell(row, qualifier, ts, sequenceId, value);
+  }
+
+  private Cell createCell(byte[] row, byte[] qualifier, long ts, long sequenceId, byte[]
value) throws IOException {
     Cell c = CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put.getCode(),
value);
     CellUtil.setSequenceId(c, sequenceId);
     return c;
@@ -1163,9 +1171,9 @@ public class TestStore {
   public void testScanWithDoubleFlush() throws IOException {
     Configuration conf = HBaseConfiguration.create();
     // Initialize region
-    MyStore myStore = initMyStore(name.getMethodName(), conf, new MyScannerHook() {
+    MyStore myStore = initMyStore(name.getMethodName(), conf, new MyStoreHook() {
       @Override
-      public void hook(final MyStore store) throws IOException {
+      public void getScanners(final MyStore store) throws IOException {
         final long tmpId = id++;
         ExecutorService s = Executors.newSingleThreadExecutor();
         s.submit(new Runnable() {
@@ -1231,6 +1239,149 @@ public class TestStore {
   }
 
   @Test
+  public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException
{
+    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
+    testFlushBeforeCompletingScan(new MyListHook() {
+      @Override
+      public void hook(int currentSize) {
+        if (currentSize == 2) {
+          try {
+            flushStore(store, id++);
+            timeToGoNextRow.set(true);
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      }
+    }, new FilterBase() {
+      @Override
+      public Filter.ReturnCode filterKeyValue(Cell v) throws IOException {
+        return Filter.ReturnCode.INCLUDE;
+      }
+    });
+  }
+
+  @Test
+  public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException
{
+    final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
+    testFlushBeforeCompletingScan(new MyListHook() {
+      @Override
+      public void hook(int currentSize) {
+        if (currentSize == 2) {
+          try {
+            flushStore(store, id++);
+            timeToGoNextRow.set(true);
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      }
+    }, new FilterBase() {
+      @Override
+      public Filter.ReturnCode filterKeyValue(Cell v) throws IOException {
+        if (timeToGoNextRow.get()) {
+          timeToGoNextRow.set(false);
+          return Filter.ReturnCode.NEXT_ROW;
+        } else {
+          return Filter.ReturnCode.INCLUDE;
+        }
+      }
+    });
+  }
+
+  @Test
+  public void testFlushBeforeCompletingScanWithFilterHint() throws IOException, InterruptedException
{
+    final AtomicBoolean timeToGetHint = new AtomicBoolean(false);
+    testFlushBeforeCompletingScan(new MyListHook() {
+      @Override
+      public void hook(int currentSize) {
+        if (currentSize == 2) {
+          try {
+            flushStore(store, id++);
+            timeToGetHint.set(true);
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      }
+    }, new FilterBase() {
+      @Override
+      public Filter.ReturnCode filterKeyValue(Cell v) throws IOException {
+        if (timeToGetHint.get()) {
+          timeToGetHint.set(false);
+          return Filter.ReturnCode.SEEK_NEXT_USING_HINT;
+        } else {
+          return Filter.ReturnCode.INCLUDE;
+        }
+      }
+      @Override
+      public Cell getNextCellHint(Cell currentCell) throws IOException {
+        return currentCell;
+      }
+    });
+  }
+
+  private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter)
+          throws IOException, InterruptedException {
+    Configuration conf = HBaseConfiguration.create();
+    HColumnDescriptor hcd = new HColumnDescriptor(family);
+    hcd.setMaxVersions(1);
+    byte[] r0 = Bytes.toBytes("row0");
+    byte[] r1 = Bytes.toBytes("row1");
+    byte[] r2 = Bytes.toBytes("row2");
+    byte[] value0 = Bytes.toBytes("value0");
+    byte[] value1 = Bytes.toBytes("value1");
+    byte[] value2 = Bytes.toBytes("value2");
+    long ts = EnvironmentEdgeManager.currentTime();
+    final long seqId = 100;
+    init(name.getMethodName(), conf, new HTableDescriptor(TableName.valueOf(table)), hcd,
new MyStoreHook() {
+      @Override
+      public long getSmallestReadPoint(HStore store) {
+        return seqId + 3;
+      }
+    });
+    // The cells having the value0 won't be flushed to disk because the value of max version
is 1
+    store.add(createCell(r0, qf1, ts, seqId, value0));
+    store.add(createCell(r0, qf2, ts, seqId, value0));
+    store.add(createCell(r0, qf3, ts, seqId, value0));
+    store.add(createCell(r1, qf1, ts + 1, seqId + 1, value1));
+    store.add(createCell(r1, qf2, ts + 1, seqId + 1, value1));
+    store.add(createCell(r1, qf3, ts + 1, seqId + 1, value1));
+    store.add(createCell(r2, qf1, ts + 2, seqId + 2, value2));
+    store.add(createCell(r2, qf2, ts + 2, seqId + 2, value2));
+    store.add(createCell(r2, qf3, ts + 2, seqId + 2, value2));
+    store.add(createCell(r1, qf1, ts + 3, seqId + 3, value1));
+    store.add(createCell(r1, qf2, ts + 3, seqId + 3, value1));
+    store.add(createCell(r1, qf3, ts + 3, seqId + 3, value1));
+    List<Cell> myList = new MyList<>(hook);
+    Scan scan = new Scan()
+            .withStartRow(r1)
+            .setFilter(filter);
+    try (InternalScanner scanner = (InternalScanner) store.getScanner(
+          scan, null, seqId + 3)){
+      // r1
+      scanner.next(myList);
+      assertEquals(3, myList.size());
+      for (Cell c : myList) {
+        byte[] actualValue = CellUtil.cloneValue(c);
+        assertTrue("expected:" + Bytes.toStringBinary(value1)
+          + ", actual:" + Bytes.toStringBinary(actualValue)
+          , Bytes.equals(actualValue, value1));
+      }
+      List<Cell> normalList = new ArrayList<>(3);
+      // r2
+      scanner.next(normalList);
+      assertEquals(3, normalList.size());
+      for (Cell c : normalList) {
+        byte[] actualValue = CellUtil.cloneValue(c);
+        assertTrue("expected:" + Bytes.toStringBinary(value2)
+          + ", actual:" + Bytes.toStringBinary(actualValue)
+          , Bytes.equals(actualValue, value2));
+      }
+    }
+  }
+
+  @Test
   public void testReclaimChunkWhenScaning() throws IOException {
     Configuration conf = HBaseConfiguration.create();
     conf.setFloat(CHUNK_POOL_MAXSIZE_KEY, 1);
@@ -1291,7 +1442,7 @@ public class TestStore {
     }
   }
 
-  private MyStore initMyStore(String methodName, Configuration conf, MyScannerHook hook)
throws IOException {
+  private MyStore initMyStore(String methodName, Configuration conf, MyStoreHook hook) throws
IOException {
     HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(table));
     HColumnDescriptor hcd = new HColumnDescriptor(family);
     hcd.setMaxVersions(5);
@@ -1300,10 +1451,10 @@ public class TestStore {
 
   private static class MyStore extends HStore {
 
-    private final MyScannerHook hook;
+    private final MyStoreHook hook;
 
     MyStore(final HRegion region, final HColumnDescriptor family,
-            final Configuration confParam, MyScannerHook hook) throws IOException {
+            final Configuration confParam, MyStoreHook hook) throws IOException {
       super(region, family, confParam);
       this.hook = hook;
     }
@@ -1312,15 +1463,23 @@ public class TestStore {
     public List<KeyValueScanner> getScanners(List<StoreFile> files, boolean cacheBlocks,
             boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher,
             byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner)
throws IOException {
-      hook.hook(this);
+      hook.getScanners(this);
       return super.getScanners(files, cacheBlocks, isGet, usePread,
               isCompaction, matcher, startRow, stopRow, readPt, includeMemstoreScanner);
     }
-  }
 
-  private interface MyScannerHook {
+    @Override
+    public long getSmallestReadPoint() {
+      return hook.getSmallestReadPoint(this);
+    }
+  }
 
-    void hook(MyStore store) throws IOException;
+  private abstract class MyStoreHook {
+    void getScanners(MyStore store) throws IOException {
+    }
+    long getSmallestReadPoint(HStore store) {
+      return store.getHRegion().getSmallestReadPoint();
+    }
   }
 
   interface MyListHook {


Mime
View raw message