Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 58B34185BB for ; Tue, 15 Dec 2015 16:54:53 +0000 (UTC) Received: (qmail 92779 invoked by uid 500); 15 Dec 2015 16:54:52 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 92520 invoked by uid 500); 15 Dec 2015 16:54:52 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 92043 invoked by uid 99); 15 Dec 2015 16:54:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Dec 2015 16:54:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DDF48E0AF9; Tue, 15 Dec 2015 16:54:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: misty@apache.org To: commits@hbase.apache.org Date: Tue, 15 Dec 2015 16:54:59 -0000 Message-Id: In-Reply-To: <8bcc8a759d9d4b0a801d815c4977fc28@git.apache.org> References: <8bcc8a759d9d4b0a801d815c4977fc28@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/13] hbase-site git commit: Published site at ef92a6a067230cae96d3e3267da5a18ac5ad89c6. http://git-wip-us.apache.org/repos/asf/hbase-site/blob/4e8775f9/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/StoreScanner.StoreScannerCompactionRace.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/StoreScanner.StoreScannerCompactionRace.html b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/StoreScanner.StoreScannerCompactionRace.html index 0f3725c..8d9efca 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/StoreScanner.StoreScannerCompactionRace.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/regionserver/StoreScanner.StoreScannerCompactionRace.html @@ -422,498 +422,497 @@ 414 415 @Override 416 public Cell peek() { -417 checkFlushed(); -418 if (this.heap == null) { -419 return this.lastTop; -420 } -421 return this.heap.peek(); -422 } -423 -424 @Override -425 public KeyValue next() { -426 // throw runtime exception perhaps? -427 throw new RuntimeException("Never call StoreScanner.next()"); -428 } -429 -430 @Override -431 public void close() { -432 close(true); -433 } -434 -435 private void close(boolean withHeapClose) { -436 if (this.closing) { -437 return; -438 } -439 if (withHeapClose) this.closing = true; -440 // Under test, we dont have a this.store -441 if (this.store != null) this.store.deleteChangedReaderObserver(this); -442 if (withHeapClose) { -443 for (KeyValueHeap h : this.heapsForDelayedClose) { -444 h.close(); -445 } -446 this.heapsForDelayedClose.clear(); -447 if (this.heap != null) { -448 this.heap.close(); -449 this.currentScanners.clear(); -450 this.heap = null; // CLOSED! -451 } -452 } else { -453 if (this.heap != null) { -454 this.heapsForDelayedClose.add(this.heap); -455 this.currentScanners.clear(); -456 this.heap = null; -457 } -458 } -459 this.lastTop = null; // If both are null, we are closed. -460 } -461 -462 @Override -463 public boolean seek(Cell key) throws IOException { -464 boolean flushed = checkFlushed(); -465 // reset matcher state, in case that underlying store changed -466 checkReseek(flushed); -467 return this.heap.seek(key); -468 } -469 -470 @Override -471 public boolean next(List<Cell> outResult) throws IOException { -472 return next(outResult, NoLimitScannerContext.getInstance()); -473 } -474 -475 /** -476 * Get the next row of values from this Store. -477 * @param outResult -478 * @param scannerContext -479 * @return true if there are more rows, false if scanner is done -480 */ -481 @Override -482 public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException { -483 if (scannerContext == null) { -484 throw new IllegalArgumentException("Scanner context cannot be null"); -485 } -486 boolean flushed = checkFlushed(); -487 if (checkReseek(flushed)) { -488 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); -489 } -490 -491 // if the heap was left null, then the scanners had previously run out anyways, close and -492 // return. -493 if (this.heap == null) { -494 // By this time partial close should happened because already heap is null -495 close(false);// Do all cleanup except heap.close() -496 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); -497 } -498 -499 Cell cell = this.heap.peek(); -500 if (cell == null) { -501 close(false);// Do all cleanup except heap.close() -502 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); -503 } -504 -505 // only call setRow if the row changes; avoids confusing the query matcher -506 // if scanning intra-row -507 -508 // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing -509 // rows. Else it is possible we are still traversing the same row so we must perform the row -510 // comparison. -511 if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.curCell == null -512 || !CellUtil.matchingRow(cell, matcher.curCell)) { -513 this.countPerRow = 0; -514 matcher.setToNewRow(cell); -515 } -516 -517 // Clear progress away unless invoker has indicated it should be kept. -518 if (!scannerContext.getKeepProgress()) scannerContext.clearProgress(); -519 -520 // Only do a sanity-check if store and comparator are available. -521 CellComparator comparator = store != null ? store.getComparator() : null; -522 -523 int count = 0; -524 long totalBytesRead = 0; -525 -526 LOOP: do { -527 // Update and check the time limit based on the configured value of cellsPerTimeoutCheck -528 if ((kvsScanned % cellsPerHeartbeatCheck == 0)) { -529 scannerContext.updateTimeProgress(); -530 if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { -531 return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues(); -532 } -533 } -534 -535 if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap. -536 checkScanOrder(prevCell, cell, comparator); -537 prevCell = cell; -538 -539 ScanQueryMatcher.MatchCode qcode = matcher.match(cell); -540 qcode = optimize(qcode, cell); -541 switch (qcode) { -542 case INCLUDE: -543 case INCLUDE_AND_SEEK_NEXT_ROW: -544 case INCLUDE_AND_SEEK_NEXT_COL: -545 -546 Filter f = matcher.getFilter(); -547 if (f != null) { -548 cell = f.transformCell(cell); -549 } -550 -551 this.countPerRow++; -552 if (storeLimit > -1 && this.countPerRow > (storeLimit + storeOffset)) { -553 // do what SEEK_NEXT_ROW does. -554 if (!matcher.moreRowsMayExistAfter(cell)) { -555 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); -556 } -557 seekToNextRow(cell); -558 break LOOP; -559 } -560 -561 // add to results only if we have skipped #storeOffset kvs -562 // also update metric accordingly -563 if (this.countPerRow > storeOffset) { -564 outResult.add(cell); -565 -566 // Update local tracking information -567 count++; -568 totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell); -569 -570 // Update the progress of the scanner context -571 scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOf(cell)); -572 scannerContext.incrementBatchProgress(1); -573 -574 if (totalBytesRead > maxRowSize) { -575 throw new RowTooBigException( -576 "Max row size allowed: " + maxRowSize + ", but the row is bigger than that."); -577 } -578 } -579 -580 if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { -581 if (!matcher.moreRowsMayExistAfter(cell)) { -582 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); -583 } -584 seekToNextRow(cell); -585 } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { -586 seekAsDirection(matcher.getKeyForNextColumn(cell)); -587 } else { -588 this.heap.next(); -589 } -590 -591 if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) { -592 break LOOP; -593 } -594 if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { -595 break LOOP; -596 } -597 continue; -598 -599 case DONE: -600 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); -601 -602 case DONE_SCAN: -603 close(false);// Do all cleanup except heap.close() -604 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); -605 -606 case SEEK_NEXT_ROW: -607 // This is just a relatively simple end of scan fix, to short-cut end -608 // us if there is an endKey in the scan. -609 if (!matcher.moreRowsMayExistAfter(cell)) { -610 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); -611 } -612 -613 seekToNextRow(cell); -614 break; -615 -616 case SEEK_NEXT_COL: -617 seekAsDirection(matcher.getKeyForNextColumn(cell)); -618 break; -619 -620 case SKIP: -621 this.heap.next(); -622 break; -623 -624 case SEEK_NEXT_USING_HINT: -625 Cell nextKV = matcher.getNextKeyHint(cell); -626 if (nextKV != null) { -627 seekAsDirection(nextKV); -628 } else { -629 heap.next(); -630 } -631 break; -632 -633 default: -634 throw new RuntimeException("UNEXPECTED"); -635 } -636 } while ((cell = this.heap.peek()) != null); -637 -638 if (count > 0) { -639 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); -640 } -641 -642 // No more keys -643 close(false);// Do all cleanup except heap.close() -644 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); -645 } -646 -647 /* -648 * See if we should actually SEEK or rather just SKIP to the next Cell. -649 * (see HBASE-13109) -650 */ -651 private ScanQueryMatcher.MatchCode optimize(ScanQueryMatcher.MatchCode qcode, Cell cell) { -652 switch(qcode) { -653 case INCLUDE_AND_SEEK_NEXT_COL: -654 case SEEK_NEXT_COL: -655 { -656 Cell nextIndexedKey = getNextIndexedKey(); -657 if (nextIndexedKey != null && nextIndexedKey != HConstants.NO_NEXT_INDEXED_KEY -658 && matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) { -659 return qcode == MatchCode.SEEK_NEXT_COL ? MatchCode.SKIP : MatchCode.INCLUDE; -660 } -661 break; -662 } -663 case INCLUDE_AND_SEEK_NEXT_ROW: -664 case SEEK_NEXT_ROW: -665 { -666 Cell nextIndexedKey = getNextIndexedKey(); -667 if (nextIndexedKey != null && nextIndexedKey != HConstants.NO_NEXT_INDEXED_KEY -668 && matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0) { -669 return qcode == MatchCode.SEEK_NEXT_ROW ? MatchCode.SKIP : MatchCode.INCLUDE; -670 } -671 break; -672 } -673 default: -674 break; -675 } -676 return qcode; -677 } -678 -679 // Implementation of ChangedReadersObserver -680 @Override -681 public void updateReaders(List<StoreFile> sfs) throws IOException { -682 flushed = true; -683 flushLock.lock(); -684 try { -685 flushedStoreFiles.addAll(sfs); -686 } finally { -687 flushLock.unlock(); -688 } -689 // Let the next() call handle re-creating and seeking -690 } -691 -692 /** -693 * @param flushed indicates if there was a flush -694 * @return true if top of heap has changed (and KeyValueHeap has to try the -695 * next KV) -696 * @throws IOException -697 */ -698 protected boolean checkReseek(boolean flushed) throws IOException { -699 if (flushed && this.lastTop != null) { -700 resetScannerStack(this.lastTop); -701 if (this.heap.peek() == null -702 || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) { -703 LOG.debug("Storescanner.peek() is changed where before = " -704 + this.lastTop.toString() + ",and after = " + this.heap.peek()); -705 this.lastTop = null; -706 return true; -707 } -708 this.lastTop = null; // gone! -709 } -710 // else dont need to reseek -711 return false; -712 } -713 -714 protected void resetScannerStack(Cell lastTopKey) throws IOException { -715 /* When we have the scan object, should we not pass it to getScanners() -716 * to get a limited set of scanners? We did so in the constructor and we -717 * could have done it now by storing the scan object from the constructor -718 */ -719 -720 final boolean isCompaction = false; -721 boolean usePread = get || scanUsePread; -722 List<KeyValueScanner> scanners = null; -723 try { -724 flushLock.lock(); -725 scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get, usePread, -726 isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true)); -727 // Clear the current set of flushed store files so that they don't get added again -728 flushedStoreFiles.clear(); -729 } finally { -730 flushLock.unlock(); -731 } -732 -733 // Seek the new scanners to the last key -734 seekScanners(scanners, lastTopKey, false, parallelSeekEnabled); -735 // remove the older memstore scanner -736 for (int i = 0; i < currentScanners.size(); i++) { -737 if (!currentScanners.get(i).isFileScanner()) { -738 currentScanners.remove(i); -739 break; -740 } -741 } -742 // add the newly created scanners on the flushed files and the current active memstore scanner -743 addCurrentScanners(scanners); -744 // Combine all seeked scanners with a heap -745 resetKVHeap(this.currentScanners, store.getComparator()); -746 // Reset the state of the Query Matcher and set to top row. -747 // Only reset and call setRow if the row changes; avoids confusing the -748 // query matcher if scanning intra-row. -749 Cell cell = heap.peek(); -750 if (cell == null) { -751 cell = lastTopKey; -752 } -753 if ((matcher.curCell == null) || !CellUtil.matchingRows(cell, matcher.curCell)) { -754 this.countPerRow = 0; -755 matcher.reset(); -756 matcher.setToNewRow(cell); -757 } -758 } -759 -760 /** -761 * Check whether scan as expected order -762 * @param prevKV -763 * @param kv -764 * @param comparator -765 * @throws IOException -766 */ -767 protected void checkScanOrder(Cell prevKV, Cell kv, -768 CellComparator comparator) throws IOException { -769 // Check that the heap gives us KVs in an increasing order. -770 assert prevKV == null || comparator == null -771 || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV -772 + " followed by a " + "smaller key " + kv + " in cf " + store; -773 } -774 -775 protected boolean seekToNextRow(Cell c) throws IOException { -776 return reseek(CellUtil.createLastOnRow(c)); -777 } -778 -779 /** -780 * Do a reseek in a normal StoreScanner(scan forward) -781 * @param kv -782 * @return true if scanner has values left, false if end of scanner -783 * @throws IOException -784 */ -785 protected boolean seekAsDirection(Cell kv) -786 throws IOException { -787 return reseek(kv); -788 } -789 -790 @Override -791 public boolean reseek(Cell kv) throws IOException { -792 boolean flushed = checkFlushed(); -793 // Heap will not be null, if this is called from next() which. -794 // If called from RegionScanner.reseek(...) make sure the scanner -795 // stack is reset if needed. -796 checkReseek(flushed); -797 if (explicitColumnQuery && lazySeekEnabledGlobally) { -798 return heap.requestSeek(kv, true, useRowColBloom); -799 } -800 return heap.reseek(kv); -801 } -802 -803 protected boolean checkFlushed() { -804 // check the var without any lock. Suppose even if we see the old -805 // value here still it is ok to continue because we will not be resetting -806 // the heap but will continue with the referenced memstore's snapshot. For compactions -807 // any way we don't need the updateReaders at all to happen as we still continue with -808 // the older files -809 if (flushed) { -810 // If there is a flush and the current scan is notified on the flush ensure that the -811 // scan's heap gets reset and we do a seek on the newly flushed file. -812 if(!this.closing) { -813 this.lastTop = this.heap.peek(); -814 } else { -815 return false; -816 } -817 // reset the flag -818 flushed = false; -819 return true; -820 } -821 return false; -822 } -823 -824 @Override -825 public long getSequenceID() { -826 return 0; -827 } -828 -829 /** -830 * Seek storefiles in parallel to optimize IO latency as much as possible -831 * @param scanners the list {@link KeyValueScanner}s to be read from -832 * @param kv the KeyValue on which the operation is being requested -833 * @throws IOException -834 */ -835 private void parallelSeek(final List<? extends KeyValueScanner> -836 scanners, final Cell kv) throws IOException { -837 if (scanners.isEmpty()) return; -838 int storeFileScannerCount = scanners.size(); -839 CountDownLatch latch = new CountDownLatch(storeFileScannerCount); -840 List<ParallelSeekHandler> handlers = -841 new ArrayList<ParallelSeekHandler>(storeFileScannerCount); -842 for (KeyValueScanner scanner : scanners) { -843 if (scanner instanceof StoreFileScanner) { -844 ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv, -845 this.readPt, latch); -846 executor.submit(seekHandler); -847 handlers.add(seekHandler); -848 } else { -849 scanner.seek(kv); -850 latch.countDown(); -851 } -852 } -853 -854 try { -855 latch.await(); -856 } catch (InterruptedException ie) { -857 throw (InterruptedIOException)new InterruptedIOException().initCause(ie); -858 } -859 -860 for (ParallelSeekHandler handler : handlers) { -861 if (handler.getErr() != null) { -862 throw new IOException(handler.getErr()); -863 } -864 } -865 } -866 -867 /** -868 * Used in testing. -869 * @return all scanners in no particular order -870 */ -871 List<KeyValueScanner> getAllScannersForTesting() { -872 List<KeyValueScanner> allScanners = new ArrayList<KeyValueScanner>(); -873 KeyValueScanner current = heap.getCurrentForTesting(); -874 if (current != null) -875 allScanners.add(current); -876 for (KeyValueScanner scanner : heap.getHeap()) -877 allScanners.add(scanner); -878 return allScanners; -879 } -880 -881 static void enableLazySeekGlobally(boolean enable) { -882 lazySeekEnabledGlobally = enable; -883 } -884 -885 /** -886 * @return The estimated number of KVs seen by this scanner (includes some skipped KVs). -887 */ -888 public long getEstimatedNumberOfKvsScanned() { -889 return this.kvsScanned; -890 } -891 -892 @Override -893 public Cell getNextIndexedKey() { -894 return this.heap.getNextIndexedKey(); -895 } -896 -897 @Override -898 public void shipped() throws IOException { -899 for (KeyValueHeap h : this.heapsForDelayedClose) { -900 h.close();// There wont be further fetch of Cells from these scanners. Just close. -901 } -902 this.heapsForDelayedClose.clear(); -903 if (this.heap != null) { -904 this.heap.shipped(); -905 } -906 } -907} -908 +417 if (this.heap == null) { +418 return this.lastTop; +419 } +420 return this.heap.peek(); +421 } +422 +423 @Override +424 public KeyValue next() { +425 // throw runtime exception perhaps? +426 throw new RuntimeException("Never call StoreScanner.next()"); +427 } +428 +429 @Override +430 public void close() { +431 close(true); +432 } +433 +434 private void close(boolean withHeapClose) { +435 if (this.closing) { +436 return; +437 } +438 if (withHeapClose) this.closing = true; +439 // Under test, we dont have a this.store +440 if (this.store != null) this.store.deleteChangedReaderObserver(this); +441 if (withHeapClose) { +442 for (KeyValueHeap h : this.heapsForDelayedClose) { +443 h.close(); +444 } +445 this.heapsForDelayedClose.clear(); +446 if (this.heap != null) { +447 this.heap.close(); +448 this.currentScanners.clear(); +449 this.heap = null; // CLOSED! +450 } +451 } else { +452 if (this.heap != null) { +453 this.heapsForDelayedClose.add(this.heap); +454 this.currentScanners.clear(); +455 this.heap = null; +456 } +457 } +458 this.lastTop = null; // If both are null, we are closed. +459 } +460 +461 @Override +462 public boolean seek(Cell key) throws IOException { +463 boolean flushed = checkFlushed(); +464 // reset matcher state, in case that underlying store changed +465 checkReseek(flushed); +466 return this.heap.seek(key); +467 } +468 +469 @Override +470 public boolean next(List<Cell> outResult) throws IOException { +471 return next(outResult, NoLimitScannerContext.getInstance()); +472 } +473 +474 /** +475 * Get the next row of values from this Store. +476 * @param outResult +477 * @param scannerContext +478 * @return true if there are more rows, false if scanner is done +479 */ +480 @Override +481 public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException { +482 if (scannerContext == null) { +483 throw new IllegalArgumentException("Scanner context cannot be null"); +484 } +485 boolean flushed = checkFlushed(); +486 if (checkReseek(flushed)) { +487 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); +488 } +489 +490 // if the heap was left null, then the scanners had previously run out anyways, close and +491 // return. +492 if (this.heap == null) { +493 // By this time partial close should happened because already heap is null +494 close(false);// Do all cleanup except heap.close() +495 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); +496 } +497 +498 Cell cell = this.heap.peek(); +499 if (cell == null) { +500 close(false);// Do all cleanup except heap.close() +501 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); +502 } +503 +504 // only call setRow if the row changes; avoids confusing the query matcher +505 // if scanning intra-row +506 +507 // If no limits exists in the scope LimitScope.Between_Cells then we are sure we are changing +508 // rows. Else it is possible we are still traversing the same row so we must perform the row +509 // comparison. +510 if (!scannerContext.hasAnyLimit(LimitScope.BETWEEN_CELLS) || matcher.curCell == null +511 || !CellUtil.matchingRow(cell, matcher.curCell)) { +512 this.countPerRow = 0; +513 matcher.setToNewRow(cell); +514 } +515 +516 // Clear progress away unless invoker has indicated it should be kept. +517 if (!scannerContext.getKeepProgress()) scannerContext.clearProgress(); +518 +519 // Only do a sanity-check if store and comparator are available. +520 CellComparator comparator = store != null ? store.getComparator() : null; +521 +522 int count = 0; +523 long totalBytesRead = 0; +524 +525 LOOP: do { +526 // Update and check the time limit based on the configured value of cellsPerTimeoutCheck +527 if ((kvsScanned % cellsPerHeartbeatCheck == 0)) { +528 scannerContext.updateTimeProgress(); +529 if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { +530 return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues(); +531 } +532 } +533 +534 if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap. +535 checkScanOrder(prevCell, cell, comparator); +536 prevCell = cell; +537 +538 ScanQueryMatcher.MatchCode qcode = matcher.match(cell); +539 qcode = optimize(qcode, cell); +540 switch (qcode) { +541 case INCLUDE: +542 case INCLUDE_AND_SEEK_NEXT_ROW: +543 case INCLUDE_AND_SEEK_NEXT_COL: +544 +545 Filter f = matcher.getFilter(); +546 if (f != null) { +547 cell = f.transformCell(cell); +548 } +549 +550 this.countPerRow++; +551 if (storeLimit > -1 && this.countPerRow > (storeLimit + storeOffset)) { +552 // do what SEEK_NEXT_ROW does. +553 if (!matcher.moreRowsMayExistAfter(cell)) { +554 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); +555 } +556 seekToNextRow(cell); +557 break LOOP; +558 } +559 +560 // add to results only if we have skipped #storeOffset kvs +561 // also update metric accordingly +562 if (this.countPerRow > storeOffset) { +563 outResult.add(cell); +564 +565 // Update local tracking information +566 count++; +567 totalBytesRead += CellUtil.estimatedSerializedSizeOf(cell); +568 +569 // Update the progress of the scanner context +570 scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOf(cell)); +571 scannerContext.incrementBatchProgress(1); +572 +573 if (totalBytesRead > maxRowSize) { +574 throw new RowTooBigException( +575 "Max row size allowed: " + maxRowSize + ", but the row is bigger than that."); +576 } +577 } +578 +579 if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) { +580 if (!matcher.moreRowsMayExistAfter(cell)) { +581 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); +582 } +583 seekToNextRow(cell); +584 } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) { +585 seekAsDirection(matcher.getKeyForNextColumn(cell)); +586 } else { +587 this.heap.next(); +588 } +589 +590 if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) { +591 break LOOP; +592 } +593 if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) { +594 break LOOP; +595 } +596 continue; +597 +598 case DONE: +599 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); +600 +601 case DONE_SCAN: +602 close(false);// Do all cleanup except heap.close() +603 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); +604 +605 case SEEK_NEXT_ROW: +606 // This is just a relatively simple end of scan fix, to short-cut end +607 // us if there is an endKey in the scan. +608 if (!matcher.moreRowsMayExistAfter(cell)) { +609 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); +610 } +611 +612 seekToNextRow(cell); +613 break; +614 +615 case SEEK_NEXT_COL: +616 seekAsDirection(matcher.getKeyForNextColumn(cell)); +617 break; +618 +619 case SKIP: +620 this.heap.next(); +621 break; +622 +623 case SEEK_NEXT_USING_HINT: +624 Cell nextKV = matcher.getNextKeyHint(cell); +625 if (nextKV != null) { +626 seekAsDirection(nextKV); +627 } else { +628 heap.next(); +629 } +630 break; +631 +632 default: +633 throw new RuntimeException("UNEXPECTED"); +634 } +635 } while ((cell = this.heap.peek()) != null); +636 +637 if (count > 0) { +638 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); +639 } +640 +641 // No more keys +642 close(false);// Do all cleanup except heap.close() +643 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); +644 } +645 +646 /* +647 * See if we should actually SEEK or rather just SKIP to the next Cell. +648 * (see HBASE-13109) +649 */ +650 private ScanQueryMatcher.MatchCode optimize(ScanQueryMatcher.MatchCode qcode, Cell cell) { +651 switch(qcode) { +652 case INCLUDE_AND_SEEK_NEXT_COL: +653 case SEEK_NEXT_COL: +654 { +655 Cell nextIndexedKey = getNextIndexedKey(); +656 if (nextIndexedKey != null && nextIndexedKey != HConstants.NO_NEXT_INDEXED_KEY +657 && matcher.compareKeyForNextColumn(nextIndexedKey, cell) >= 0) { +658 return qcode == MatchCode.SEEK_NEXT_COL ? MatchCode.SKIP : MatchCode.INCLUDE; +659 } +660 break; +661 } +662 case INCLUDE_AND_SEEK_NEXT_ROW: +663 case SEEK_NEXT_ROW: +664 { +665 Cell nextIndexedKey = getNextIndexedKey(); +666 if (nextIndexedKey != null && nextIndexedKey != HConstants.NO_NEXT_INDEXED_KEY +667 && matcher.compareKeyForNextRow(nextIndexedKey, cell) >= 0) { +668 return qcode == MatchCode.SEEK_NEXT_ROW ? MatchCode.SKIP : MatchCode.INCLUDE; +669 } +670 break; +671 } +672 default: +673 break; +674 } +675 return qcode; +676 } +677 +678 // Implementation of ChangedReadersObserver +679 @Override +680 public void updateReaders(List<StoreFile> sfs) throws IOException { +681 flushed = true; +682 flushLock.lock(); +683 try { +684 flushedStoreFiles.addAll(sfs); +685 } finally { +686 flushLock.unlock(); +687 } +688 // Let the next() call handle re-creating and seeking +689 } +690 +691 /** +692 * @param flushed indicates if there was a flush +693 * @return true if top of heap has changed (and KeyValueHeap has to try the +694 * next KV) +695 * @throws IOException +696 */ +697 protected boolean checkReseek(boolean flushed) throws IOException { +698 if (flushed && this.lastTop != null) { +699 resetScannerStack(this.lastTop); +700 if (this.heap.peek() == null +701 || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) { +702 LOG.debug("Storescanner.peek() is changed where before = " +703 + this.lastTop.toString() + ",and after = " + this.heap.peek()); +704 this.lastTop = null; +705 return true; +706 } +707 this.lastTop = null; // gone! +708 } +709 // else dont need to reseek +710 return false; +711 } +712 +713 protected void resetScannerStack(Cell lastTopKey) throws IOException { +714 /* When we have the scan object, should we not pass it to getScanners() +715 * to get a limited set of scanners? We did so in the constructor and we +716 * could have done it now by storing the scan object from the constructor +717 */ +718 +719 final boolean isCompaction = false; +720 boolean usePread = get || scanUsePread; +721 List<KeyValueScanner> scanners = null; +722 try { +723 flushLock.lock(); +724 scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get, usePread, +725 isCompaction, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true)); +726 // Clear the current set of flushed store files so that they don't get added again +727 flushedStoreFiles.clear(); +728 } finally { +729 flushLock.unlock(); +730 } +731 +732 // Seek the new scanners to the last key +733 seekScanners(scanners, lastTopKey, false, parallelSeekEnabled); +734 // remove the older memstore scanner +735 for (int i = 0; i < currentScanners.size(); i++) { +736 if (!currentScanners.get(i).isFileScanner()) { +737 currentScanners.remove(i); +738 break; +739 } +740 } +741 // add the newly created scanners on the flushed files and the current active memstore scanner +742 addCurrentScanners(scanners); +743 // Combine all seeked scanners with a heap +744 resetKVHeap(this.currentScanners, store.getComparator()); +745 // Reset the state of the Query Matcher and set to top row. +746 // Only reset and call setRow if the row changes; avoids confusing the +747 // query matcher if scanning intra-row. +748 Cell cell = heap.peek(); +749 if (cell == null) { +750 cell = lastTopKey; +751 } +752 if ((matcher.curCell == null) || !CellUtil.matchingRows(cell, matcher.curCell)) { +753 this.countPerRow = 0; +754 matcher.reset(); +755 matcher.setToNewRow(cell); +756 } +757 } +758 +759 /** +760 * Check whether scan as expected order +761 * @param prevKV +762 * @param kv +763 * @param comparator +764 * @throws IOException +765 */ +766 protected void checkScanOrder(Cell prevKV, Cell kv, +767 CellComparator comparator) throws IOException { +768 // Check that the heap gives us KVs in an increasing order. +769 assert prevKV == null || comparator == null +770 || comparator.compare(prevKV, kv) <= 0 : "Key " + prevKV +771 + " followed by a " + "smaller key " + kv + " in cf " + store; +772 } +773 +774 protected boolean seekToNextRow(Cell c) throws IOException { +775 return reseek(CellUtil.createLastOnRow(c)); +776 } +777 +778 /** +779 * Do a reseek in a normal StoreScanner(scan forward) +780 * @param kv +781 * @return true if scanner has values left, false if end of scanner +782 * @throws IOException +783 */ +784 protected boolean seekAsDirection(Cell kv) +785 throws IOException { +786 return reseek(kv); +787 } +788 +789 @Override +790 public boolean reseek(Cell kv) throws IOException { +791 boolean flushed = checkFlushed(); +792 // Heap will not be null, if this is called from next() which. +793 // If called from RegionScanner.reseek(...) make sure the scanner +794 // stack is reset if needed. +795 checkReseek(flushed); +796 if (explicitColumnQuery && lazySeekEnabledGlobally) { +797 return heap.requestSeek(kv, true, useRowColBloom); +798 } +799 return heap.reseek(kv); +800 } +801 +802 protected boolean checkFlushed() { +803 // check the var without any lock. Suppose even if we see the old +804 // value here still it is ok to continue because we will not be resetting +805 // the heap but will continue with the referenced memstore's snapshot. For compactions +806 // any way we don't need the updateReaders at all to happen as we still continue with +807 // the older files +808 if (flushed) { +809 // If there is a flush and the current scan is notified on the flush ensure that the +810 // scan's heap gets reset and we do a seek on the newly flushed file. +811 if(!this.closing) { +812 this.lastTop = this.peek(); +813 } else { +814 return false; +815 } +816 // reset the flag +817 flushed = false; +818 return true; +819 } +820 return false; +821 } +822 +823 @Override +824 public long getSequenceID() { +825 return 0; +826 } +827 +828 /** +829 * Seek storefiles in parallel to optimize IO latency as much as possible +830 * @param scanners the list {@link KeyValueScanner}s to be read from +831 * @param kv the KeyValue on which the operation is being requested +832 * @throws IOException +833 */ +834 private void parallelSeek(final List<? extends KeyValueScanner> +835 scanners, final Cell kv) throws IOException { +836 if (scanners.isEmpty()) return; +837 int storeFileScannerCount = scanners.size(); +838 CountDownLatch latch = new CountDownLatch(storeFileScannerCount); +839 List<ParallelSeekHandler> handlers = +840 new ArrayList<ParallelSeekHandler>(storeFileScannerCount); +841 for (KeyValueScanner scanner : scanners) { +842 if (scanner instanceof StoreFileScanner) { +843 ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv, +844 this.readPt, latch); +845 executor.submit(seekHandler); +846 handlers.add(seekHandler); +847 } else { +848 scanner.seek(kv); +849 latch.countDown(); +850 } +851 } +852 +853 try { +854 latch.await(); +855 } catch (InterruptedException ie) { +856 throw (InterruptedIOException)new InterruptedIOException().initCause(ie); +857 } +858 +859 for (ParallelSeekHandler handler : handlers) { +