Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D6B9B10AB3 for ; Thu, 28 Nov 2013 18:14:34 +0000 (UTC) Received: (qmail 34564 invoked by uid 500); 28 Nov 2013 18:14:00 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 34441 invoked by uid 500); 28 Nov 2013 18:13:53 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 34416 invoked by uid 99); 28 Nov 2013 18:13:52 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Nov 2013 18:13:52 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Nov 2013 18:13:46 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 8523C23889ED; Thu, 28 Nov 2013 18:13:24 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1546427 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/client/ test/java/org/apache/hadoop/hbase/client/ Date: Thu, 28 Nov 2013 18:13:24 -0000 To: commits@hbase.apache.org From: liyin@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20131128181324.8523C23889ED@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: liyin Date: Thu Nov 28 18:13:23 2013 New Revision: 1546427 URL: http://svn.apache.org/r1546427 Log: [HBASE-8185] Refactoring ClientScanner Author: daviddeng Summary: Using ResultScannerImpl to implement ClientScanner Test Plan: Run HTable related testcases Reviewers: manukranthk Reviewed By: manukranthk CC: mahesh Differential Revision: https://phabricator.fb.com/D1065999 Task ID: 2394125 Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientLocalScanner.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ResultScannerImpl.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide2.java Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientLocalScanner.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientLocalScanner.java?rev=1546427&r1=1546426&r2=1546427&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientLocalScanner.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientLocalScanner.java Thu Nov 28 18:13:23 2013 @@ -166,7 +166,7 @@ public class ClientLocalScanner extends * @throws IOException * @throws Exception */ - protected boolean doRealOpenScanners(byte[] localStartKey) + protected boolean doRealOpenScanners(byte[] localStartKey, int nbRows) throws IOException { try { this.currentRegion = Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1546427&r1=1546426&r2=1546427&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java Thu Nov 28 18:13:23 2013 @@ -24,8 +24,6 @@ import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -35,8 +33,6 @@ import java.util.concurrent.SynchronousQ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -84,6 +80,7 @@ public class HTable implements HTableInt private HBaseRPCOptions options; private boolean recordClientContext = false; + @SuppressWarnings("unused") private long maxScannerResultSize; // Share this multiaction thread pool across all the HTable instance; @@ -371,7 +368,6 @@ public class HTable implements HTableInt * @return Pair of arrays of region starting and ending row keys * @throws IOException if a remote or network exception occurs */ - @SuppressWarnings("unchecked") public Pair getStartEndKeys() throws IOException { final List startKeyList = new ArrayList(); final List endKeyList = new ArrayList(); @@ -390,8 +386,9 @@ public class HTable implements HTableInt } }; MetaScanner.metaScan(configuration, visitor, this.tableName); - return new Pair(startKeyList.toArray(new byte[startKeyList.size()][]), - endKeyList.toArray(new byte[endKeyList.size()][])); + return new Pair(startKeyList.toArray( + new byte[startKeyList.size()][]), endKeyList.toArray( + new byte[endKeyList.size()][])); } /** @@ -402,7 +399,6 @@ public class HTable implements HTableInt * @return TreeMap of {startKey,endKey} pairs * @throws IOException if a remote or network exception occurs */ - @SuppressWarnings("unchecked") public TreeMap getStartEndKeysMap() throws IOException { final TreeMap startEndKeysMap = new TreeMap(new Bytes.ByteArrayComparator()); @@ -1079,37 +1075,16 @@ public class HTable implements HTableInt * If there are multiple regions in a table, this scanner will iterate * through them all. */ - protected class ClientScanner implements ResultScanner { - private final Log CLIENT_LOG = LogFactory.getLog(this.getClass()); + protected class ClientScanner extends ResultScannerImpl { // HEADSUP: The scan internal start row can change as we move through table. - private Scan scan; - private boolean closed = false; // Current region scanner is against. Gets cleared if current region goes // wonky: e.g. if it splits on us. - private HRegionInfo currentRegion = null; private ScannerCallable callable = null; - private final LinkedList cache = new LinkedList(); - private final int caching; - private long lastNext; // Keep lastResult returned successfully in case we have to reset scanner. private Result lastResult = null; protected ClientScanner(final Scan scan) { - if (CLIENT_LOG.isDebugEnabled()) { - CLIENT_LOG.debug("Creating scanner over " - + Bytes.toString(getTableName()) - + " starting at key '" + Bytes.toStringBinary(scan.getStartRow()) + "'"); - } - this.scan = scan; - this.lastNext = System.currentTimeMillis(); - - // Use the caching from the Scan. If not set, use the default cache setting for this table. - if (this.scan.getCaching() > 0) { - this.caching = this.scan.getCaching(); - } else { - this.caching = HTable.this.scannerCaching; - } - + super(scan, HTable.this); // Removed filter validation. We have a new format now, only one of all // the current filters has a validate() method. We can add it back, // need to decide on what we're going to do re: filter redesign. @@ -1118,85 +1093,41 @@ public class HTable implements HTableInt // respect. } - public void initialize() throws IOException { - nextScanner(this.caching, false); - } - protected Scan getScan() { return scan; } - protected long getTimestamp() { - return lastNext; - } - - // returns true if the passed region endKey - private boolean checkScanStopRow(final byte [] endKey) { - if (this.scan.getStopRow().length > 0) { - // there is a stop row, check to see if we are past it. - byte [] stopRow = scan.getStopRow(); - int cmp = Bytes.compareTo(stopRow, 0, stopRow.length, - endKey, 0, endKey.length); - if (cmp <= 0) { - // stopRow <= endKey (endKey is equals to or larger than stopRow) - // This is a stop. - return true; - } - } - return false; //unlikely. + protected ScannerCallable getScannerCallable(byte [] localStartKey, + int nbRows, HBaseRPCOptions options) { + scan.setStartRow(localStartKey); + ScannerCallable s = new ScannerCallable( + getConnectionAndResetOperationContext(), getTableName(), scan, + options); + s.setCaching(nbRows); + return s; } - /* - * Gets a scanner for the next region. If this.currentRegion != null, then - * we will move to the endrow of this.currentRegion. Else we will get - * scanner at the scan.getStartRow(). We will go no further, just tidy - * up outstanding scanners, if currentRegion != null and - * done is true. - * @param nbRows - * @param done Server-side says we're done scanning. - */ - private boolean nextScanner(int nbRows, final boolean done) - throws IOException { + @Override + protected void cleanUpPreviousScanners() throws IOException { // Close the previous scanner if it's open if (this.callable != null) { this.callable.setClose(); - getConnectionAndResetOperationContext().getRegionServerWithRetries(callable); + getConnectionAndResetOperationContext().getRegionServerWithRetries( + callable); this.callable = null; } - // Where to start the next scanner - byte [] localStartKey; - - // if we're at end of table, close and return false to stop iterating - if (this.currentRegion != null) { - byte [] endKey = this.currentRegion.getEndKey(); - if (endKey == null || - Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) || - checkScanStopRow(endKey) || - done) { - close(); - if (CLIENT_LOG.isDebugEnabled()) { - CLIENT_LOG.debug("Finished with scanning at " + this.currentRegion); - } - return false; - } - localStartKey = endKey; - if (CLIENT_LOG.isDebugEnabled()) { - CLIENT_LOG.debug("Finished with region " + this.currentRegion); - } - } else { - localStartKey = this.scan.getStartRow(); - } + } - if (CLIENT_LOG.isDebugEnabled()) { - CLIENT_LOG.debug("Advancing internal scanner to startKey at '" + - Bytes.toStringBinary(localStartKey) + "'"); - } + @Override + protected boolean doRealOpenScanners(byte[] localStartKey, int nbRows) + throws IOException { try { callable = getScannerCallable(localStartKey, nbRows, options); // Open a scanner on the region server starting at the // beginning of the region - getConnectionAndResetOperationContext().getRegionServerWithRetries(callable); + getConnectionAndResetOperationContext().getRegionServerWithRetries( + callable); this.currentRegion = callable.getHRegionInfo(); } catch (IOException e) { close(); @@ -1205,116 +1136,80 @@ public class HTable implements HTableInt return true; } - protected ScannerCallable getScannerCallable(byte [] localStartKey, - int nbRows, HBaseRPCOptions options) { - scan.setStartRow(localStartKey); - ScannerCallable s = new ScannerCallable(getConnectionAndResetOperationContext(), - getTableName(), scan, options); - s.setCaching(nbRows); - return s; - } - - public Result next() throws IOException { - // If the scanner is closed but there is some rows left in the cache, - // it will first empty it before returning null - if (cache.size() == 0 && this.closed) { - return null; - } - if (cache.size() == 0) { - Result [] values = null; - // We need to reset it if it's a new callable that was created - // with a countdown in nextScanner - callable.setCaching(this.caching); - // This flag is set when we want to skip the result returned. We do - // this when we reset scanner because it split under us. - boolean skipFirst = false; - boolean foundResults = false; - do { - try { - // Server returns a null values if scanning is to stop. Else, - // returns an empty array if scanning is to go on and we've just - // exhausted current region. - values = getConnectionAndResetOperationContext().getRegionServerWithRetries(callable); - if (skipFirst) { - skipFirst = false; - // Reget. - values = getConnectionAndResetOperationContext().getRegionServerWithRetries(callable); - } - } catch (DoNotRetryIOException e) { - if (e instanceof UnknownScannerException) { - long timeout = lastNext + scannerTimeout; - // If we are over the timeout, throw this exception to the client - // Else, it's because the region moved and we used the old id - // against the new region server; reset the scanner. - if (timeout < System.currentTimeMillis()) { - long elapsed = System.currentTimeMillis() - lastNext; - ScannerTimeoutException ex = new ScannerTimeoutException( - elapsed + "ms passed since the last invocation, " + - "timeout is currently set to " + scannerTimeout); - ex.initCause(e); - throw ex; - } - } else { - Throwable cause = e.getCause(); - if (cause == null || !(cause instanceof NotServingRegionException)) { - throw e; - } + @Override + protected void cacheNextResults() throws IOException { + Result [] values = null; + // We need to reset it if it's a new callable that was created + // with a countdown in nextScanner + callable.setCaching(this.caching); + // This flag is set when we want to skip the result returned. We do + // this when we reset scanner because it split under us. + boolean skipFirst = false; + boolean foundResults = false; + do { + try { + // Server returns a null values if scanning is to stop. Else, + // returns an empty array if scanning is to go on and we've just + // exhausted current region. + values = getConnectionAndResetOperationContext( + ).getRegionServerWithRetries(callable); + if (skipFirst) { + skipFirst = false; + // Reget. + values = getConnectionAndResetOperationContext() + .getRegionServerWithRetries(callable); + } + } catch (DoNotRetryIOException e) { + if (e instanceof UnknownScannerException) { + long timeout = this.lastNextCallTimeStamp + scannerTimeout; + // If we are over the timeout, throw this exception to the client + // Else, it's because the region moved and we used the old id + // against the new region server; reset the scanner. + if (timeout < System.currentTimeMillis()) { + long elapsed = System.currentTimeMillis() + - this.lastNextCallTimeStamp; + ScannerTimeoutException ex = new ScannerTimeoutException( + elapsed + "ms passed since the last invocation, " + + "timeout is currently set to " + scannerTimeout); + ex.initCause(e); + throw ex; } - // Else, its signal from depths of ScannerCallable that we got an - // NSRE on a next and that we need to reset the scanner. - if (this.lastResult != null) { - this.scan.setStartRow(this.lastResult.getRow()); - // Skip first row returned. We already let it out on previous - // invocation. - skipFirst = true; + } else { + Throwable cause = e.getCause(); + if (cause == null + || !(cause instanceof NotServingRegionException)) { + throw e; } - // Clear region - this.currentRegion = null; } - lastNext = System.currentTimeMillis(); - if (values != null && values.length > 0) { - foundResults = true; - for (Result rs : values) { - cache.add(rs); - this.lastResult = rs; - } + // Else, its signal from depths of ScannerCallable that we got an + // NSRE on a next and that we need to reset the scanner. + if (this.lastResult != null) { + this.scan.setStartRow(this.lastResult.getRow()); + // Skip first row returned. We already let it out on previous + // invocation. + skipFirst = true; } - } while (!foundResults && nextScanner(this.caching, values == null)); - } - if (cache.size() > 0) { - return cache.poll(); - } - return null; - } - - /** - * Get nbRows rows. - * How many RPCs are made is determined by the {@link Scan#setCaching(int)} - * setting (or hbase.client.scanner.caching in hbase-site.xml). - * @param nbRows number of rows to return - * @return Between zero and nbRows RowResults. Scan is done - * if returned array is of zero-length (We never return null). - * @throws IOException - */ - public Result [] next(int nbRows) throws IOException { - // Collect values to be returned here - ArrayList resultSets = new ArrayList(nbRows); - for(int i = 0; i < nbRows; i++) { - Result next = next(); - if (next != null) { - resultSets.add(next); - } else { - break; + // Clear region + this.currentRegion = null; } - } - return resultSets.toArray(new Result[resultSets.size()]); + this.lastNextCallTimeStamp = System.currentTimeMillis(); + if (values != null && values.length > 0) { + foundResults = true; + for (Result rs : values) { + cache.add(rs); + this.lastResult = rs; + } + } + } while (!foundResults && nextScanner(this.caching, values == null)); } - public void close() { + @Override + protected void closeCurrentScanner() { if (callable != null) { callable.setClose(); try { - getConnectionAndResetOperationContext().getRegionServerWithRetries(callable); + getConnectionAndResetOperationContext().getRegionServerWithRetries( + callable); } catch (IOException e) { // We used to catch this error, interpret, and rethrow. However, we // have since decided that it's not nice for a scanner's close to @@ -1323,51 +1218,6 @@ public class HTable implements HTableInt } callable = null; } - closed = true; - } - - public Iterator iterator() { - return new Iterator() { - // The next RowResult, possibly pre-read - Result next = null; - - // return true if there is another item pending, false if there isn't. - // this method is where the actual advancing takes place, but you need - // to call next() to consume it. hasNext() will only advance if there - // isn't a pending next(). - public boolean hasNext() { - if (next == null) { - try { - next = ClientScanner.this.next(); - return next != null; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - return true; - } - - // get the pending next item and advance the iterator. returns null if - // there is no next item. - public Result next() { - // since hasNext() does the real advancing, we call this to determine - // if there is a next before proceeding. - if (!hasNext()) { - return null; - } - - // if we get to here, then hasNext() has given us an item to return. - // we want to return the item and then null out the next pointer, so - // we use a temporary variable. - Result temp = next; - next = null; - return temp; - } - - public void remove() { - throw new UnsupportedOperationException(); - } - }; } } Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ResultScannerImpl.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ResultScannerImpl.java?rev=1546427&r1=1546426&r2=1546427&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ResultScannerImpl.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ResultScannerImpl.java Thu Nov 28 18:13:23 2013 @@ -116,20 +116,21 @@ public abstract class ResultScannerImpl CLIENT_LOG.debug("Advancing internal scanner to startKey at '" + Bytes.toStringBinary(localStartKey) + "'"); } - return doRealOpenScanners(localStartKey); + return doRealOpenScanners(localStartKey, nbRows); } /** * This function is intended clean up the previous scanners before opening * a new scanner. + * @throws IOException */ - protected abstract void cleanUpPreviousScanners(); + protected abstract void cleanUpPreviousScanners() throws IOException; /** * Opens the scanners once the start key and current region is identified */ - protected abstract boolean doRealOpenScanners(byte[] localStartKey) - throws IOException; + protected abstract boolean doRealOpenScanners(byte[] localStartKey, + int nbRows) throws IOException; @Override public Result next() throws IOException { Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide2.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide2.java?rev=1546427&r1=1546426&r2=1546427&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide2.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide2.java Thu Nov 28 18:13:23 2013 @@ -1,27 +1,23 @@ package org.apache.hadoop.hbase.client; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Random; -import java.util.UUID; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueTestUtil; import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; import org.apache.hadoop.hbase.filter.ColumnRangeFilter; -import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test;