hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From li...@apache.org
Subject svn commit: r1546427 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/client/ test/java/org/apache/hadoop/hbase/client/
Date Thu, 28 Nov 2013 18:13:24 GMT
Author: liyin
Date: Thu Nov 28 18:13:23 2013
New Revision: 1546427

URL: http://svn.apache.org/r1546427
Log:
[HBASE-8185] Refactoring ClientScanner

Author: daviddeng

Summary: Using ResultScannerImpl to implement ClientScanner

Test Plan: Run HTable related testcases

Reviewers: manukranthk

Reviewed By: manukranthk

CC: mahesh

Differential Revision: https://phabricator.fb.com/D1065999

Task ID: 2394125

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientLocalScanner.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ResultScannerImpl.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide2.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientLocalScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientLocalScanner.java?rev=1546427&r1=1546426&r2=1546427&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientLocalScanner.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientLocalScanner.java
Thu Nov 28 18:13:23 2013
@@ -166,7 +166,7 @@ public class ClientLocalScanner extends 
    * @throws IOException
    * @throws Exception
    */
-  protected boolean doRealOpenScanners(byte[] localStartKey)
+  protected boolean doRealOpenScanners(byte[] localStartKey, int nbRows)
       throws IOException {
     try {
       this.currentRegion =

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1546427&r1=1546426&r2=1546427&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java Thu Nov
28 18:13:23 2013
@@ -24,8 +24,6 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -35,8 +33,6 @@ import java.util.concurrent.SynchronousQ
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -84,6 +80,7 @@ public class HTable implements HTableInt
   private HBaseRPCOptions options;
   private boolean recordClientContext = false;
 
+  @SuppressWarnings("unused")
   private long maxScannerResultSize;
 
   // Share this multiaction thread pool across all the HTable instance;
@@ -371,7 +368,6 @@ public class HTable implements HTableInt
    * @return Pair of arrays of region starting and ending row keys
    * @throws IOException if a remote or network exception occurs
    */
-  @SuppressWarnings("unchecked")
   public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
     final List<byte[]> startKeyList = new ArrayList<byte[]>();
     final List<byte[]> endKeyList = new ArrayList<byte[]>();
@@ -390,8 +386,9 @@ public class HTable implements HTableInt
       }
     };
     MetaScanner.metaScan(configuration, visitor, this.tableName);
-    return new Pair(startKeyList.toArray(new byte[startKeyList.size()][]),
-                endKeyList.toArray(new byte[endKeyList.size()][]));
+    return new Pair<byte[][],byte[][]>(startKeyList.toArray(
+        new byte[startKeyList.size()][]), endKeyList.toArray(
+            new byte[endKeyList.size()][]));
   }
   
   /**
@@ -402,7 +399,6 @@ public class HTable implements HTableInt
    * @return TreeMap of {startKey,endKey} pairs
    * @throws IOException if a remote or network exception occurs
    */
-  @SuppressWarnings("unchecked")
   public TreeMap<byte[], byte[]> getStartEndKeysMap() throws IOException {
     final TreeMap<byte[], byte[]> startEndKeysMap =
       new TreeMap<byte[], byte[]>(new Bytes.ByteArrayComparator());
@@ -1079,37 +1075,16 @@ public class HTable implements HTableInt
    * If there are multiple regions in a table, this scanner will iterate
    * through them all.
    */
-  protected class ClientScanner implements ResultScanner {
-    private final Log CLIENT_LOG = LogFactory.getLog(this.getClass());
+  protected class ClientScanner extends ResultScannerImpl {
     // HEADSUP: The scan internal start row can change as we move through table.
-    private Scan scan;
-    private boolean closed = false;
     // Current region scanner is against.  Gets cleared if current region goes
     // wonky: e.g. if it splits on us.
-    private HRegionInfo currentRegion = null;
     private ScannerCallable callable = null;
-    private final LinkedList<Result> cache = new LinkedList<Result>();
-    private final int caching;
-    private long lastNext;
     // Keep lastResult returned successfully in case we have to reset scanner.
     private Result lastResult = null;
 
     protected ClientScanner(final Scan scan) {
-      if (CLIENT_LOG.isDebugEnabled()) {
-        CLIENT_LOG.debug("Creating scanner over "
-            + Bytes.toString(getTableName())
-            + " starting at key '" + Bytes.toStringBinary(scan.getStartRow()) + "'");
-      }
-      this.scan = scan;
-      this.lastNext = System.currentTimeMillis();
-
-      // Use the caching from the Scan.  If not set, use the default cache setting for this
table.
-      if (this.scan.getCaching() > 0) {
-        this.caching = this.scan.getCaching();
-      } else {
-        this.caching = HTable.this.scannerCaching;
-      }
-
+      super(scan, HTable.this);
       // Removed filter validation.  We have a new format now, only one of all
       // the current filters has a validate() method.  We can add it back,
       // need to decide on what we're going to do re: filter redesign.
@@ -1118,85 +1093,41 @@ public class HTable implements HTableInt
       // respect.
     }
 
-    public void initialize() throws IOException {
-      nextScanner(this.caching, false);
-    }
-
     protected Scan getScan() {
       return scan;
     }
 
-    protected long getTimestamp() {
-      return lastNext;
-    }
-
-    // returns true if the passed region endKey
-    private boolean checkScanStopRow(final byte [] endKey) {
-      if (this.scan.getStopRow().length > 0) {
-        // there is a stop row, check to see if we are past it.
-        byte [] stopRow = scan.getStopRow();
-        int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
-          endKey, 0, endKey.length);
-        if (cmp <= 0) {
-          // stopRow <= endKey (endKey is equals to or larger than stopRow)
-          // This is a stop.
-          return true;
-        }
-      }
-      return false; //unlikely.
+    protected ScannerCallable getScannerCallable(byte [] localStartKey,
+        int nbRows, HBaseRPCOptions options) {
+      scan.setStartRow(localStartKey);
+      ScannerCallable s = new ScannerCallable(
+          getConnectionAndResetOperationContext(), getTableName(), scan,
+          options);
+      s.setCaching(nbRows);
+      return s;
     }
 
-    /*
-     * Gets a scanner for the next region.  If this.currentRegion != null, then
-     * we will move to the endrow of this.currentRegion.  Else we will get
-     * scanner at the scan.getStartRow().  We will go no further, just tidy
-     * up outstanding scanners, if <code>currentRegion != null</code> and
-     * <code>done</code> is true.
-     * @param nbRows
-     * @param done Server-side says we're done scanning.
-     */
-    private boolean nextScanner(int nbRows, final boolean done)
-    throws IOException {
+    @Override
+    protected void cleanUpPreviousScanners() throws IOException {
       // Close the previous scanner if it's open
       if (this.callable != null) {
         this.callable.setClose();
-        getConnectionAndResetOperationContext().getRegionServerWithRetries(callable);
+        getConnectionAndResetOperationContext().getRegionServerWithRetries(
+            callable);
         this.callable = null;
       }
 
-      // Where to start the next scanner
-      byte [] localStartKey;
-
-      // if we're at end of table, close and return false to stop iterating
-      if (this.currentRegion != null) {
-        byte [] endKey = this.currentRegion.getEndKey();
-        if (endKey == null ||
-            Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
-            checkScanStopRow(endKey) ||
-            done) {
-          close();
-          if (CLIENT_LOG.isDebugEnabled()) {
-            CLIENT_LOG.debug("Finished with scanning at " + this.currentRegion);
-          }
-          return false;
-        }
-        localStartKey = endKey;
-        if (CLIENT_LOG.isDebugEnabled()) {
-          CLIENT_LOG.debug("Finished with region " + this.currentRegion);
-        }
-      } else {
-        localStartKey = this.scan.getStartRow();
-      }
+    }
 
-      if (CLIENT_LOG.isDebugEnabled()) {
-        CLIENT_LOG.debug("Advancing internal scanner to startKey at '" +
-          Bytes.toStringBinary(localStartKey) + "'");
-      }
+    @Override
+    protected boolean doRealOpenScanners(byte[] localStartKey, int nbRows)
+        throws IOException {
       try {
         callable = getScannerCallable(localStartKey, nbRows, options);
         // Open a scanner on the region server starting at the
         // beginning of the region
-        getConnectionAndResetOperationContext().getRegionServerWithRetries(callable);
+        getConnectionAndResetOperationContext().getRegionServerWithRetries(
+            callable);
         this.currentRegion = callable.getHRegionInfo();
       } catch (IOException e) {
         close();
@@ -1205,116 +1136,80 @@ public class HTable implements HTableInt
       return true;
     }
 
-    protected ScannerCallable getScannerCallable(byte [] localStartKey,
-        int nbRows, HBaseRPCOptions options) {
-      scan.setStartRow(localStartKey);
-      ScannerCallable s = new ScannerCallable(getConnectionAndResetOperationContext(),
-        getTableName(), scan, options);
-      s.setCaching(nbRows);
-      return s;
-    }
-
-    public Result next() throws IOException {
-      // If the scanner is closed but there is some rows left in the cache,
-      // it will first empty it before returning null
-      if (cache.size() == 0 && this.closed) {
-        return null;
-      }
-      if (cache.size() == 0) {
-        Result [] values = null;
-        // We need to reset it if it's a new callable that was created
-        // with a countdown in nextScanner
-        callable.setCaching(this.caching);
-        // This flag is set when we want to skip the result returned.  We do
-        // this when we reset scanner because it split under us.
-        boolean skipFirst = false;
-        boolean foundResults = false;
-        do {
-          try {
-            // Server returns a null values if scanning is to stop.  Else,
-            // returns an empty array if scanning is to go on and we've just
-            // exhausted current region.
-            values = getConnectionAndResetOperationContext().getRegionServerWithRetries(callable);
-            if (skipFirst) {
-              skipFirst = false;
-              // Reget.
-              values = getConnectionAndResetOperationContext().getRegionServerWithRetries(callable);
-            }
-          } catch (DoNotRetryIOException e) {
-            if (e instanceof UnknownScannerException) {
-              long timeout = lastNext + scannerTimeout;
-              // If we are over the timeout, throw this exception to the client
-              // Else, it's because the region moved and we used the old id
-              // against the new region server; reset the scanner.
-              if (timeout < System.currentTimeMillis()) {
-                long elapsed = System.currentTimeMillis() - lastNext;
-                ScannerTimeoutException ex = new ScannerTimeoutException(
-                    elapsed + "ms passed since the last invocation, " +
-                    "timeout is currently set to " + scannerTimeout);
-                ex.initCause(e);
-                throw ex;
-              }
-            } else {
-              Throwable cause = e.getCause();
-              if (cause == null || !(cause instanceof NotServingRegionException)) {
-                throw e;
-              }
+    @Override
+    protected void cacheNextResults() throws IOException {
+      Result [] values = null;
+      // We need to reset it if it's a new callable that was created
+      // with a countdown in nextScanner
+      callable.setCaching(this.caching);
+      // This flag is set when we want to skip the result returned.  We do
+      // this when we reset scanner because it split under us.
+      boolean skipFirst = false;
+      boolean foundResults = false;
+      do {
+        try {
+          // Server returns a null values if scanning is to stop.  Else,
+          // returns an empty array if scanning is to go on and we've just
+          // exhausted current region.
+          values = getConnectionAndResetOperationContext(
+              ).getRegionServerWithRetries(callable);
+          if (skipFirst) {
+            skipFirst = false;
+            // Reget.
+            values = getConnectionAndResetOperationContext()
+                .getRegionServerWithRetries(callable);
+          }
+        } catch (DoNotRetryIOException e) {
+          if (e instanceof UnknownScannerException) {
+            long timeout = this.lastNextCallTimeStamp + scannerTimeout;
+            // If we are over the timeout, throw this exception to the client
+            // Else, it's because the region moved and we used the old id
+            // against the new region server; reset the scanner.
+            if (timeout < System.currentTimeMillis()) {
+              long elapsed = System.currentTimeMillis()
+                  - this.lastNextCallTimeStamp;
+              ScannerTimeoutException ex = new ScannerTimeoutException(
+                  elapsed + "ms passed since the last invocation, " +
+                  "timeout is currently set to " + scannerTimeout);
+              ex.initCause(e);
+              throw ex;
             }
-            // Else, its signal from depths of ScannerCallable that we got an
-            // NSRE on a next and that we need to reset the scanner.
-            if (this.lastResult != null) {
-              this.scan.setStartRow(this.lastResult.getRow());
-              // Skip first row returned.  We already let it out on previous
-              // invocation.
-              skipFirst = true;
+          } else {
+            Throwable cause = e.getCause();
+            if (cause == null
+                || !(cause instanceof NotServingRegionException)) {
+              throw e;
             }
-            // Clear region
-            this.currentRegion = null;
           }
-          lastNext = System.currentTimeMillis();
-          if (values != null && values.length > 0) {
-            foundResults = true;
-            for (Result rs : values) {
-              cache.add(rs);
-              this.lastResult = rs;
-            }
+          // Else, its signal from depths of ScannerCallable that we got an
+          // NSRE on a next and that we need to reset the scanner.
+          if (this.lastResult != null) {
+            this.scan.setStartRow(this.lastResult.getRow());
+            // Skip first row returned.  We already let it out on previous
+            // invocation.
+            skipFirst = true;
           }
-        } while (!foundResults && nextScanner(this.caching, values == null));
-      }
-      if (cache.size() > 0) {
-        return cache.poll();
-      }
-      return null;
-    }
-
-    /**
-     * Get <param>nbRows</param> rows.
-     * How many RPCs are made is determined by the {@link Scan#setCaching(int)}
-     * setting (or hbase.client.scanner.caching in hbase-site.xml).
-     * @param nbRows number of rows to return
-     * @return Between zero and <param>nbRows</param> RowResults.  Scan is done
-     * if returned array is of zero-length (We never return null).
-     * @throws IOException
-     */
-    public Result [] next(int nbRows) throws IOException {
-      // Collect values to be returned here
-      ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
-      for(int i = 0; i < nbRows; i++) {
-        Result next = next();
-        if (next != null) {
-          resultSets.add(next);
-        } else {
-          break;
+          // Clear region
+          this.currentRegion = null;
         }
-      }
-      return resultSets.toArray(new Result[resultSets.size()]);
+        this.lastNextCallTimeStamp = System.currentTimeMillis();
+        if (values != null && values.length > 0) {
+          foundResults = true;
+          for (Result rs : values) {
+            cache.add(rs);
+            this.lastResult = rs;
+          }
+        }
+      } while (!foundResults && nextScanner(this.caching, values == null));
     }
 
-    public void close() {
+    @Override
+    protected void closeCurrentScanner() {
       if (callable != null) {
         callable.setClose();
         try {
-          getConnectionAndResetOperationContext().getRegionServerWithRetries(callable);
+          getConnectionAndResetOperationContext().getRegionServerWithRetries(
+              callable);
         } catch (IOException e) {
           // We used to catch this error, interpret, and rethrow. However, we
           // have since decided that it's not nice for a scanner's close to
@@ -1323,51 +1218,6 @@ public class HTable implements HTableInt
         }
         callable = null;
       }
-      closed = true;
-    }
-
-    public Iterator<Result> iterator() {
-      return new Iterator<Result>() {
-        // The next RowResult, possibly pre-read
-        Result next = null;
-
-        // return true if there is another item pending, false if there isn't.
-        // this method is where the actual advancing takes place, but you need
-        // to call next() to consume it. hasNext() will only advance if there
-        // isn't a pending next().
-        public boolean hasNext() {
-          if (next == null) {
-            try {
-              next = ClientScanner.this.next();
-              return next != null;
-            } catch (IOException e) {
-              throw new RuntimeException(e);
-            }
-          }
-          return true;
-        }
-
-        // get the pending next item and advance the iterator. returns null if
-        // there is no next item.
-        public Result next() {
-          // since hasNext() does the real advancing, we call this to determine
-          // if there is a next before proceeding.
-          if (!hasNext()) {
-            return null;
-          }
-
-          // if we get to here, then hasNext() has given us an item to return.
-          // we want to return the item and then null out the next pointer, so
-          // we use a temporary variable.
-          Result temp = next;
-          next = null;
-          return temp;
-        }
-
-        public void remove() {
-          throw new UnsupportedOperationException();
-        }
-      };
     }
   }
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ResultScannerImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ResultScannerImpl.java?rev=1546427&r1=1546426&r2=1546427&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ResultScannerImpl.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ResultScannerImpl.java
Thu Nov 28 18:13:23 2013
@@ -116,20 +116,21 @@ public abstract class ResultScannerImpl 
       CLIENT_LOG.debug("Advancing internal scanner to startKey at '" +
         Bytes.toStringBinary(localStartKey) + "'");
     }
-    return doRealOpenScanners(localStartKey);
+    return doRealOpenScanners(localStartKey, nbRows);
   }
 
   /**
    * This function is intended clean up the previous scanners before opening
    * a new scanner.
+   * @throws IOException
    */
-  protected abstract void cleanUpPreviousScanners();
+  protected abstract void cleanUpPreviousScanners() throws IOException;
 
   /**
    * Opens the scanners once the start key and current region is identified
    */
-  protected abstract boolean doRealOpenScanners(byte[] localStartKey)
-  throws IOException;
+  protected abstract boolean doRealOpenScanners(byte[] localStartKey,
+      int nbRows) throws IOException;
 
   @Override
   public Result next() throws IOException {

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide2.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide2.java?rev=1546427&r1=1546426&r2=1546427&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide2.java
(original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide2.java
Thu Nov 28 18:13:23 2013
@@ -1,27 +1,23 @@
 package org.apache.hadoop.hbase.client;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Random;
-import java.util.UUID;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueTestUtil;
 import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
 import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
-import org.apache.hadoop.hbase.filter.PrefixFilter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 



Mime
View raw message