hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject hbase git commit: HBASE-18055 Releasing L2 cache HFileBlocks before shipped() when switching from pread to stream causes result corruption
Date Tue, 16 May 2017 13:21:07 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 37650775a -> ad9ffaaaf


HBASE-18055 Releasing L2 cache HFileBlocks before shipped() when switching from pread to stream
causes result corruption


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ad9ffaaa
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ad9ffaaa
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ad9ffaaa

Branch: refs/heads/master
Commit: ad9ffaaafd074423b54401839da235fe976070a4
Parents: 3765077
Author: zhangduo <zhangduo@apache.org>
Authored: Tue May 16 17:01:37 2017 +0800
Committer: zhangduo <zhangduo@apache.org>
Committed: Tue May 16 21:16:36 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/StoreScanner.java | 76 +++++++++++---------
 .../regionserver/TestSwitchToStreamRead.java    |  2 +
 2 files changed, 43 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ad9ffaaa/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index e9feb37..953e911 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -146,7 +146,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   private volatile boolean flushed = false;
   // generally we get one file from a flush
   private final List<StoreFile> flushedStoreFiles = new ArrayList<>(1);
-  // generally we get one memstroe scanner from a flush
+  // generally we get one memstore scanner from a flush
   private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(1);
   // The current list of scanners
   private final List<KeyValueScanner> currentScanners = new ArrayList<>();
@@ -539,7 +539,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     if (scannerContext == null) {
       throw new IllegalArgumentException("Scanner context cannot be null");
     }
-    trySwitchToStreamRead();
     if (checkFlushed() && reopenAfterFlush()) {
       return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
     }
@@ -954,7 +953,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     return heap.reseek(kv);
   }
 
-  private void trySwitchToStreamRead() throws IOException {
+  private void trySwitchToStreamRead() {
     if (readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null
||
         bytesRead < preadMaxBytes) {
       return;
@@ -971,44 +970,45 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     }
     List<StoreFile> filesToReopen = new ArrayList<>();
     List<KeyValueScanner> memstoreScanners = new ArrayList<>();
-    List<KeyValueScanner> fileScanners = null;
     List<KeyValueScanner> scannersToClose = new ArrayList<>();
-    boolean succ = false;
-    try {
-      for (KeyValueScanner kvs : currentScanners) {
-        if (!kvs.isFileScanner()) {
-          memstoreScanners.add(kvs);
-        } else {
-          scannersToClose.add(kvs);
-          if (kvs.peek() == null) {
-            continue;
-          }
-          filesToReopen.add(name2File.get(kvs.getFilePath().getName()));
+    for (KeyValueScanner kvs : currentScanners) {
+      if (!kvs.isFileScanner()) {
+        memstoreScanners.add(kvs);
+      } else {
+        scannersToClose.add(kvs);
+        if (kvs.peek() == null) {
+          continue;
         }
+        filesToReopen.add(name2File.get(kvs.getFilePath().getName()));
       }
-      if (filesToReopen.isEmpty()) {
-        return;
-      }
+    }
+    if (filesToReopen.isEmpty()) {
+      return;
+    }
+    List<KeyValueScanner> fileScanners = null;
+    List<KeyValueScanner> newCurrentScanners;
+    KeyValueHeap newHeap;
+    try {
       fileScanners =
           store.getScanners(filesToReopen, cacheBlocks, false, false, matcher, scan.getStartRow(),
             scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), readPt, false);
       seekScanners(fileScanners, lastTop, false, parallelSeekEnabled);
-      currentScanners.clear();
-      addCurrentScanners(fileScanners);
-      addCurrentScanners(memstoreScanners);
-      resetKVHeap(currentScanners, store.getComparator());
-      resetQueryMatcher(lastTop);
-      for (KeyValueScanner kvs : scannersToClose) {
-        kvs.close();
-      }
-      succ = true;
-    } finally {
-      if (!succ && fileScanners != null) {
-        for (KeyValueScanner scanner : fileScanners) {
-          scanner.close();
-        }
+      newCurrentScanners = new ArrayList<>(fileScanners.size() + memstoreScanners.size());
+      newCurrentScanners.addAll(fileScanners);
+      newCurrentScanners.addAll(memstoreScanners);
+      newHeap = new KeyValueHeap(newCurrentScanners, store.getComparator());
+    } catch (Exception e) {
+      LOG.warn("failed to switch to stream read", e);
+      if (fileScanners != null) {
+        fileScanners.forEach(KeyValueScanner::close);
       }
+      return;
     }
+    currentScanners.clear();
+    addCurrentScanners(newCurrentScanners);
+    this.heap = newHeap;
+    resetQueryMatcher(lastTop);
+    scannersToClose.forEach(KeyValueScanner::close);
   }
 
   protected final boolean checkFlushed() {
@@ -1117,12 +1117,18 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
       prevCell = KeyValueUtil.toNewKeyCell(this.prevCell);
     }
     matcher.beforeShipped();
-    for (KeyValueHeap h : this.heapsForDelayedClose) {
-      h.close();// There wont be further fetch of Cells from these scanners. Just close.
-    }
+    // There wont be further fetch of Cells from these scanners. Just close.
+    this.heapsForDelayedClose.forEach(KeyValueHeap::close);
     this.heapsForDelayedClose.clear();
     if (this.heap != null) {
       this.heap.shipped();
+      // When switching from pread to stream, we will open a new scanner for each store file,
but
+      // the old scanner may still track the HFileBlocks we have scanned but not sent back
to client
+      // yet. If we close the scanner immediately then the HFileBlocks may be messed up by
others
+      // before we serialize and send it back to client. The HFileBlocks will be released
in shipped
+      // method, so we here will also open new scanners and close old scanners in shipped
method.
+      // See HBASE-18055 for more details.
+      trySwitchToStreamRead();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ad9ffaaa/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java
index fb978b1..767ad2e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java
@@ -104,6 +104,7 @@ public class TestSwitchToStreamRead {
         Result result = Result.create(cells);
         assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL)));
         cells.clear();
+        scanner.shipped();
       }
       for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
         if (kvs instanceof StoreFileScanner) {
@@ -117,6 +118,7 @@ public class TestSwitchToStreamRead {
         Result result = Result.create(cells);
         assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL)));
         cells.clear();
+        scanner.shipped();
       }
     }
     // make sure all scanners are closed.


Mime
View raw message