hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From li...@apache.org
Subject svn commit: r1407699 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/client/ test/java/org/apache/hadoop/hb...
Date Sat, 10 Nov 2012 00:01:44 GMT
Author: liyin
Date: Sat Nov 10 00:01:42 2012
New Revision: 1407699

URL: http://svn.apache.org/viewvc?rev=1407699&view=rev
Log:
JIRA [HBASE-6770]: Allow scanner to specify the response size

Author: cjin

Summary: Allow scanner to specify the request size rather than set both batch and caching
to control the response size, and it can also specify whether the row can be truncated or
not

Test Plan: unitTest, add TestMaxResponseSize.java to test the new fields (maxRequestSize,
partialRow) and add TestHRegionNextRows() in TestHRegion.java

Reviewers: aaiyer, kranganathan, kannan, liyintang

Reviewed By: kannan

CC: hbase-eng@, alex

Differential Revision: https://phabricator.fb.com/D607836

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestMaxResponseSize.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1407699&r1=1407698&r2=1407699&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Sat Nov 10
00:01:42 2012
@@ -472,7 +472,7 @@ public final class HConstants {
      * scanner's next method.
      */
   public static String HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY = "hbase.client.scanner.max.result.size";
-
+  
   /**
    * Maximum number of bytes returned when calling a scanner's next method.
    * Note that when a single row is larger than this limit the row is still
@@ -484,6 +484,15 @@ public final class HConstants {
 
 
   /**
+   * Maximum number of bytes returned when calling a scanner's next method.
+   * Used with partialRow parameter on the client side.  Note that when a 
+   * single row is larger than this limit, the row is still returned completely
+   * if partialRow is true, otherwise, the row will be truncated in order to
+   * fit the memory.
+   */
+  public static int DEFAULT_HBASE_SCANNER_MAX_RESULT_SIZE = Integer.MAX_VALUE;
+
+  /**
    * HRegion server lease period in milliseconds. Clients must report in within this period
    * else they are considered dead. Unit measured in ms (milliseconds).
    */

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1407699&r1=1407698&r2=1407699&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java Sat Nov
10 00:01:42 2012
@@ -1065,14 +1065,13 @@ public class HTable implements HTableInt
       }
       if (cache.size() == 0) {
         Result [] values = null;
-        long remainingResultSize = maxScannerResultSize;
-        int countdown = this.caching;
         // We need to reset it if it's a new callable that was created
         // with a countdown in nextScanner
         callable.setCaching(this.caching);
         // This flag is set when we want to skip the result returned.  We do
         // this when we reset scanner because it split under us.
         boolean skipFirst = false;
+        boolean foundResults = false;
         do {
           try {
             // Server returns a null values if scanning is to stop.  Else,
@@ -1094,7 +1093,7 @@ public class HTable implements HTableInt
                 long elapsed = System.currentTimeMillis() - lastNext;
                 ScannerTimeoutException ex = new ScannerTimeoutException(
                     elapsed + "ms passed since the last invocation, " +
-                        "timeout is currently set to " + scannerTimeout);
+                    "timeout is currently set to " + scannerTimeout);
                 ex.initCause(e);
                 throw ex;
               }
@@ -1114,23 +1113,17 @@ public class HTable implements HTableInt
             }
             // Clear region
             this.currentRegion = null;
-            continue;
           }
           lastNext = System.currentTimeMillis();
           if (values != null && values.length > 0) {
+            foundResults = true;
             for (Result rs : values) {
               cache.add(rs);
-              for (KeyValue kv : rs.raw()) {
-                  remainingResultSize -= kv.heapSize();
-              }
-              countdown--;
               this.lastResult = rs;
             }
           }
-          // Values == null means server-side filter has determined we must STOP
-        } while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown,
values == null));
+        } while (!foundResults && nextScanner(this.caching, values == null));
       }
-
       if (cache.size() > 0) {
         return cache.poll();
       }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java?rev=1407699&r1=1407698&r2=1407699&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java Sat Nov
10 00:01:42 2012
@@ -83,8 +83,11 @@ import java.util.TreeSet;
  * execute {@link #setCacheBlocks(boolean)}.
  */
 public class Scan extends Operation implements Writable {
-  private static final byte SCAN_VERSION = (byte)3;
-
+  private static final int VERSION_STORE_LIMIT = 2;
+  private static final int VERSION_STORE_OFFSET = 3;
+  private static final int VERSION_RESPONSE_SIZE = 4;
+  private static final byte SCAN_VERSION = VERSION_RESPONSE_SIZE;
+  
   private byte [] startRow = HConstants.EMPTY_START_ROW;
   private byte [] stopRow  = HConstants.EMPTY_END_ROW;
   private int maxVersions = 1;
@@ -92,6 +95,9 @@ public class Scan extends Operation impl
   private int storeLimit = -1;
   private int storeOffset = 0;
   private int caching = -1;
+  private int maxResponseSize = HConstants.DEFAULT_HBASE_SCANNER_MAX_RESULT_SIZE;
+  private int currentPartialResponseSize = 0;
+  private boolean partialRow = false;
   private boolean cacheBlocks = true;
   private Filter filter = null;
   private TimeRange tr = new TimeRange();
@@ -331,6 +337,55 @@ public class Scan extends Operation impl
    */
   public void setCaching(int caching) {
     this.caching = caching;
+    this.partialRow = false;
+    this.maxResponseSize = HConstants.DEFAULT_HBASE_SCANNER_MAX_RESULT_SIZE;
+  }
+
+  /**
+   * This is technically not the max available memory setting, more of a hint. 
+   * We will add KV's till we exceed this setting if partialRow is true, 
+   * and add entire rows till we exceed this setting if partialRow is false.
+   * !!!NOTE!!!: this call will overwrite the caching setting and set it as
+   * int.max_value. If you really want row-based constraint as well, use
+   * setCaching(int caching), which will reset maxResponseSize to match your
+   * configuration and disable partial row. 
+   */
+  public void setCaching(int responseSize, boolean partialRow) {
+    this.maxResponseSize = responseSize;
+    this.partialRow = partialRow;
+    this.caching = Integer.MAX_VALUE; 
+  }
+
+  /**
+   * @return maximum response size that client can handle for a single call to next()
+   */
+  public int getMaxResponseSize() {
+    return this.maxResponseSize;
+  }
+
+  /**
+   * @return whether the last row can be partially transferred for a single call to next()
+   */
+  public boolean isPartialRow() {
+    return this.partialRow;
+  }
+
+  /**
+   * Set currentPartialResponseSize to accumulated response size 
+   * for all the KeyValue pairs collected so far. This is only used at
+   * server side, and not used as a client API. 
+   * @param responseSize
+   */
+  public void setCurrentPartialResponseSize(int responseSize) {
+    this.currentPartialResponseSize = responseSize;
+  }
+
+  /*
+   * Get current PartialResponseSize. This is only used at server side, 
+   * and not used as a client API. 
+   */
+  public int getCurrentPartialResponseSize() {
+    return this.currentPartialResponseSize;
   }
 
   /**
@@ -528,6 +583,8 @@ public class Scan extends Operation impl
     map.put("caching", this.caching);
     map.put("cacheBlocks", this.cacheBlocks);
     map.put("storeLimit", this.storeLimit);
+    map.put("maxResponseSize", this.maxResponseSize);
+    map.put("partialRow", this.partialRow);
     List<Long> timeRange = new ArrayList<Long>();
     timeRange.add(this.tr.getMin());
     timeRange.add(this.tr.getMax());
@@ -584,12 +641,16 @@ public class Scan extends Operation impl
     this.stopRow = Bytes.readByteArray(in);
     this.maxVersions = in.readInt();
     this.batch = in.readInt();
-    if (version > 1) {
+    if (version >= VERSION_STORE_LIMIT) {
       this.storeLimit = in.readInt();
     }
-    if (version > 2) {
+    if (version >= VERSION_STORE_OFFSET) {
       this.storeOffset = in.readInt();
     }
+    if (version >= VERSION_RESPONSE_SIZE) {
+      this.maxResponseSize = in.readInt();
+      this.partialRow = in.readBoolean();
+    }
     this.caching = in.readInt();
     this.cacheBlocks = in.readBoolean();
     if(in.readBoolean()) {
@@ -616,22 +677,29 @@ public class Scan extends Operation impl
   public void write(final DataOutput out)
   throws IOException {
     byte version = (byte)1;
-    if (this.storeOffset != 0) {
-      version = SCAN_VERSION;
+    if (this.maxResponseSize != HConstants.DEFAULT_HBASE_SCANNER_MAX_RESULT_SIZE) {
+      version = (byte) VERSION_RESPONSE_SIZE;
+    } else if (this.storeOffset != 0) {
+      version = (byte)VERSION_STORE_OFFSET;
     } else if (this.storeLimit != -1) {
-      version = 2;
+      version = (byte)VERSION_STORE_LIMIT;
     }
+
     out.writeByte(version);
     Bytes.writeByteArray(out, this.startRow);
     Bytes.writeByteArray(out, this.stopRow);
     out.writeInt(this.maxVersions);
     out.writeInt(this.batch);
-    if (version > 1) {
+    if (version >= VERSION_STORE_LIMIT) {
       out.writeInt(this.storeLimit);
     }
-    if (version > 2) {
+    if (version >= VERSION_STORE_OFFSET) {
       out.writeInt(this.storeOffset);
     }
+    if (version >= VERSION_RESPONSE_SIZE) {
+      out.writeInt(this.maxResponseSize);
+      out.writeBoolean(this.partialRow);
+    }
     out.writeInt(this.caching);
     out.writeBoolean(this.cacheBlocks);
     if(this.filter == null) {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1407699&r1=1407698&r2=1407699&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Sat Nov 10 00:01:42 2012
@@ -3084,10 +3084,7 @@ public class HRegion implements HeapSize
       return next(outResults, limit, null);
     }
 
-    @Override
-    public synchronized boolean next(List<KeyValue> outResults, int limit,
-        String metric) throws IOException {
-      readRequests.incrTotalRequstCount();
+    private void preCondition() throws IOException{
       if (this.filterClosed) {
         throw new UnknownScannerException("Scanner was closed (timed out?) " +
             "after we renewed it. Could be caused by a very slow scanner " +
@@ -3101,7 +3098,68 @@ public class HRegion implements HeapSize
 
       // This could be a new thread from the last time we called next().
       MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
+    }
 
+    /**
+     * A method to return all the rows that can fit in the response size.
+     * it respects the two stop conditions:
+     * 1) scan.getMaxResponseSize
+     * 2) scan.getCaching() (which is nbRows) 
+     * the loop breaks whoever comes first.
+     * This is only used by scan(), not get()
+     * @param outResults a list of rows to return 
+     * @param nbRows the number of rows that can be returned at most
+     * @param metric the metric name 
+     * @return true if there are more rows to fetch.
+     *
+     * This is used by Scans.
+     */
+    public synchronized void nextRows(List<Result> outResults, int nbRows, 
+        String metric) throws IOException {
+      preCondition(); 
+      List<KeyValue> tmpList = new ArrayList<KeyValue>();
+      int limit = this.getOriginalScan().getBatch();
+      int currentNbRows = 0;
+      boolean moreRows = true;
+      // This is necessary b/c partialResponseSize is not serialized through RPC    
+      getOriginalScan().setCurrentPartialResponseSize(0);
+      int maxResponseSize = getOriginalScan().getMaxResponseSize();
+      do {
+        moreRows = nextInternal(tmpList, limit, metric);
+        if (!tmpList.isEmpty()) {
+          currentNbRows++;
+          if (outResults != null) {
+            outResults.add(new Result(tmpList));
+            tmpList.clear();
+          }
+        }
+        resetFilters();
+        if (isFilterDone()) {
+          readRequests.incrTotalRequstCount(currentNbRows);
+          return;
+        }
+         
+        // While Condition
+        // 1. respect maxResponseSize and nbRows whichever comes first,
+        // 2. recheck the currentPartialResponseSize is to catch the case
+        //   where maxResponseSize is saturated and partialRow == false 
+        //   since we allow this case valid in the nextInternal() layer
+      } while (moreRows && 
+          (getOriginalScan().getCurrentPartialResponseSize() < maxResponseSize 
+           && currentNbRows < nbRows));
+       
+      readRequests.incrTotalRequstCount(currentNbRows);
+    }
+    
+    /**
+     * This is used by Gets & Compactions & unit tests, whereas nextRows() is
+     * used by Scans
+     */
+    @Override
+    public synchronized boolean next(List<KeyValue> outResults, int limit,
+        String metric) throws IOException {
+      readRequests.incrTotalRequstCount();
+      preCondition();
       boolean returnResult;
       if (outResults.isEmpty()) {
          // Usually outResults is empty. This is true when next is called
@@ -3151,6 +3209,9 @@ public class HRegion implements HeapSize
         throw new IllegalArgumentException("First parameter should be an empty list");
       }
 
+      boolean partialRow = getOriginalScan().isPartialRow();
+      long maxResponseSize = getOriginalScan().getMaxResponseSize();
+      
       while (true) {
         byte [] currentRow = peekRow();
         if (isStopRow(currentRow)) {
@@ -3170,10 +3231,17 @@ public class HRegion implements HeapSize
           do {
             this.storeHeap.next(results, limit - results.size(), metric);
             if (limit > 0 && results.size() == limit) {
-              if (this.filter != null && filter.hasFilterRow()) throw new IncompatibleFilterException(
+              if (this.filter != null && filter.hasFilterRow()) 
+                throw new IncompatibleFilterException(
                   "Filter with filterRow(List<KeyValue>) incompatible with scan with
limit!");
               return true; // we are expecting more yes, but also limited to how many we
can return.
             }
+            // this gaurantees that we still complete the entire row if
+            // currentPartialResponseSize exceeds the maxResponseSize. 
+            if (partialRow && getOriginalScan().getCurrentPartialResponseSize()
+                 >= maxResponseSize) {
+              return true;
+            }
           } while (Bytes.equals(currentRow, nextRow = peekRow()));
 
           final boolean stopRow = isStopRow(nextRow);

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1407699&r1=1407698&r2=1407699&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Sat Nov 10 00:01:42 2012
@@ -228,8 +228,6 @@ public class HRegionServer implements HR
 
   protected final int numRegionsToReport;
 
-  private final long maxScannerResultSize;
-
   // Remote HMaster
   private HMasterRegionInterface hbaseMaster;
 
@@ -391,10 +389,6 @@ public class HRegionServer implements HR
 
     sleeper = new Sleeper(this.msgInterval, this);
 
-    this.maxScannerResultSize = conf.getLong(
-            HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
-            HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
-
     // Task thread to process requests from Master
     this.worker = new Worker();
 
@@ -2536,7 +2530,9 @@ public class HRegionServer implements HR
   public Result [] next(final long scannerId, int nbRows) throws IOException {
     try {
       String scannerName = String.valueOf(scannerId);
-      InternalScanner s = this.scanners.get(scannerName);
+      // HRegionServer only deals with Region Scanner, 
+      // thus, we just typecast directly
+      HRegion.RegionScanner s = (HRegion.RegionScanner)this.scanners.get(scannerName);
       if (s == null) {
         throw new UnknownScannerException("Name: " + scannerName);
       }
@@ -2549,34 +2545,13 @@ public class HRegionServer implements HR
         throw e;
       }
       this.leases.renewLease(scannerName);
-      List<Result> results = new ArrayList<Result>(nbRows);
-      long currentScanResultSize = 0;
-      List<KeyValue> values = new ArrayList<KeyValue>();
-      int i = 0;
-      for (; i < nbRows && currentScanResultSize < maxScannerResultSize; i++)
{
-        // Collect values to be returned here
-        boolean moreRows = s.next(values, HRegion.METRIC_NEXTSIZE);
-        if (!values.isEmpty()) {
-          if (maxScannerResultSize < Long.MAX_VALUE){
-            for (KeyValue kv : values) {
-              currentScanResultSize += kv.heapSize();
-            }
-          }
-          results.add(new Result(values));
-        }
-        if (!moreRows) {
-          break;
-        }
-        values.clear();
-      }
-      numReads.addAndGet(i);
-      // Below is an ugly hack where we cast the InternalScanner to be a
-      // HRegion.RegionScanner.  The alternative is to change InternalScanner
-      // interface but its used everywhere whereas we just need a bit of info
-      // from HRegion.RegionScanner, IF its filter if any is done with the scan
+      List<Result> results = new ArrayList<Result>();
+      s.nextRows(results, nbRows, HRegion.METRIC_NEXTSIZE); 
+      numReads.addAndGet(results.size());
+      // IF its filter if any is done with the scan
       // and wants to tell the client to stop the scan.  This is done by passing
       // a null result.
-      return ((HRegion.RegionScanner)s).isFilterDone() && results.isEmpty()?
+      return s.isFilterDone() && results.isEmpty()?
         null: results.toArray(new Result[0]);
     } catch (Throwable t) {
       if (t instanceof NotServingRegionException) {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1407699&r1=1407698&r2=1407699&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
Sat Nov 10 00:01:42 2012
@@ -37,9 +37,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.regionserver.kvaggregator.KeyValueAggregator;
-
-
-import static org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
+import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
 
 /**
  * Scanner scans both the memstore and the HStore. Coalesce KeyValue stream
@@ -369,7 +367,11 @@ public class StoreScanner extends NonLaz
     KeyValue.KVComparator comparator =
         store != null ? store.getComparator() : null;
 
-    long addedResultsSize = 0;
+    int addedResultsSize = 0;
+    // set the responseSize so that it now can fetch records
+    // in terms of keyvalue's boundary rather than row's boundary
+    int remainingResponseSize = scan.getMaxResponseSize()
+                              - scan.getCurrentPartialResponseSize();
     try {
       LOOP: while((kv = this.heap.peek()) != null) {
         // kv is no longer immutable due to KeyOnlyFilter! use copy for safety
@@ -436,7 +438,13 @@ public class StoreScanner extends NonLaz
             } else {
               this.heap.next();
             }
-
+            // If the user is strict about the response size and allows
+            // partialRow scanning, we only return the number of keyvalue 
+            // pairs to fill up the request size. Otherwise when partialRow 
+            // is false, we just fetch the entire row
+            if (scan.isPartialRow() && addedResultsSize >= remainingResponseSize)
{
+              break LOOP;
+            }
             if (limit > 0 && (numNewKeyValues == limit)) {
               break LOOP;
             }
@@ -509,6 +517,9 @@ public class StoreScanner extends NonLaz
       throw e;
 
     } finally {
+      // update the remaining response size
+      scan.setCurrentPartialResponseSize(scan.getCurrentPartialResponseSize()
+          + addedResultsSize);
       // update the counter
       if (addedResultsSize > 0 && metric != null) {
         HRegion.incrNumericMetric(this.metricNamePrefix + metric,

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestMaxResponseSize.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestMaxResponseSize.java?rev=1407699&r1=1407698&r2=1407699&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestMaxResponseSize.java
(original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestMaxResponseSize.java
Sat Nov 10 00:01:42 2012
@@ -0,0 +1,310 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestMaxResponseSize{
+  final Log LOG = LogFactory.getLog(getClass());
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static byte [] ROW = Bytes.toBytes("testRow");
+  private static byte [] FAMILY = Bytes.toBytes("testFamily");
+  private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
+  private static byte [] VALUE = Bytes.toBytes("testValue");
+  private static int NUM_RS = 3;
+  private static int NUM_REGION = 10;
+  private static int NUM_VERSION = 3;
+  /**
+   * @throws java.lang.Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster(NUM_RS);
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @Before
+  public void setUp() throws Exception {
+    // Nothing to do.
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @After
+  public void tearDown() throws Exception {
+    // Nothing to do.
+  }
+
+  public void createTestTable() throws Exception {
+  }
+
+  /**
+   * Test from client side for scans that are cross multiple regions
+   * @throws Exception
+   */
+  @Test
+  public void testScanCrossRegion() throws Exception {
+    byte [] TABLE = Bytes.toBytes("testScanCrossRegion");
+    byte[][] FAMILIES = { Bytes.toBytes("MyCF1") };
+    List<KeyValue> kvListExp = new ArrayList<KeyValue>();
+
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, NUM_VERSION,
+        Bytes.toBytes("row0"), Bytes.toBytes("row99"), NUM_REGION);
+    
+    TEST_UTIL.waitUntilAllRegionsAssigned(NUM_REGION);
+   
+    Random rand = new Random(System.currentTimeMillis());
+    for (int iRow = 0; iRow < 100; ++iRow) {
+      final byte[] row = Bytes.toBytes(String.format("row%02d", iRow));
+      Put put = new Put(row);
+      final long ts = System.currentTimeMillis();
+      for (int iCol = 0; iCol < 10; ++iCol) {
+        final byte[] cf = FAMILIES[0];
+
+        final byte[] qual = Bytes.toBytes("col" + iCol);
+        final byte[] value = Bytes.toBytes("value_for_row_" + iRow + "_cf_"
+            + Bytes.toStringBinary(cf) + "_col_" + iCol + "_ts_" + ts
+            + "_random_" + rand.nextLong());
+        KeyValue kv = new KeyValue(row, cf, qual, ts, value);
+        put.add(kv);
+        kvListExp.add(kv);
+      }
+      ht.put(put);
+      ht.flushCommits();
+    }
+
+    boolean toLog = true;
+    Scan scan = new Scan();
+    Result result;
+    // each region have 10 rows, we fetch 5 rows at a time
+    scan.setCaching(5);
+    ResultScanner scanner;
+    List<KeyValue> kvListScan = new ArrayList<KeyValue>();
+    scanner = ht.getScanner(scan);
+    kvListScan.clear();
+    // do a full scan of the table that is split among multiple regions
+    while ((result = scanner.next()) != null) {
+      for (KeyValue kv : result.list()) {
+        kvListScan.add(kv);
+      }
+    }
+    scanner.close();
+    result = new Result(kvListScan);
+    verifyResult(result, kvListExp, toLog, "testScanCrossRegion");
+  }
+
+  /**
+   * Test from client side for scan with responseSize and partialRow
+   * responseSize is small value
+   * @throws Exception
+   */
+  @Test
+  public void testScanMaxRequstSize() throws Exception {    
+    byte [] TABLE = Bytes.toBytes("testScanMaxRequstSize");
+    byte [][] ROWS= makeNAscii(ROW, 3);
+    byte [][] FAMILIES = makeNAscii(FAMILY, 3);
+    byte [][] QUALIFIERS = makeNAscii(QUALIFIER, 10);
+
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES);
+    List<KeyValue> kvListExp = new ArrayList<KeyValue>();
+    ht.setProfiling(true);
+    Put put;
+    int kvSize = (new KeyValue(ROWS[0], FAMILIES[0], QUALIFIERS[0], 1, VALUE))
+      .getLength();
+    int rowSize = (kvSize * (FAMILIES.length * QUALIFIERS.length));
+    
+    for (int r=0; r < ROWS.length; r++) {
+      put = new Put(ROWS[r]);
+      for (int c=0; c < FAMILIES.length; c++) { 
+        for (int q=0; q < QUALIFIERS.length; q++) { 
+          KeyValue kv = new KeyValue(ROWS[r], FAMILIES[c], QUALIFIERS[q], 1, VALUE);
+          put.add(kv);
+          kvListExp.add(kv);
+        }
+      }
+      ht.put(put);
+    }
+    
+    /**
+     * Test with the small bufferSize that is smaller rowSize 
+     * The response size is set to only fit half a row.
+     * if partialRow == true, the expected number of fetches is 6 in order to 
+     * retrieve all the 3 rows, otherwise we can fetch an entire row each time,
+     * which makes the expected number to be 3.
+     * 
+     * x x x x|x x x x     [2]
+     * < rpc1 >< rpc2 >
+     * x x x x|x x x x     [2]
+     * < rpc3 >< rpc4 >
+     * x x x x|x x x x     [2]
+     * < rpc5 >< rpc6 >
+     * 
+     * x: kv pair
+     * rpc#n#: n-th rpc call
+     * [n]   : the number of scanner.next() per row 
+     */
+    int responseSize = rowSize / 2;
+    // each row will take ceil(rowSize/responseSize) times to fetch
+    // and the number of rows is ROWS.length,
+    // therefore the total number of fetches is
+    // ceil(rowSize/responseSize) * ROWS.length
+    int scanCntExp = ((rowSize + responseSize - 1) / responseSize) * ROWS.length;
+    // each scanner.next trigger a rpc call
+    int rpcCntExp = scanCntExp;
+    testScan(ht, rowSize, kvSize, scanCntExp, rpcCntExp, kvListExp,
+        responseSize, true);
+    
+    /**
+    * x x x x|x x x x     [1]
+    * <     rpc1    >
+    * x x x x|x x x x     [1]
+    * <     rpc2    >
+    * x x x x|x x x x     [1]
+    * <     rpc3    >
+    */
+    scanCntExp = ROWS.length;
+    rpcCntExp = ROWS.length;
+    testScan(ht, rowSize, kvSize, scanCntExp, rpcCntExp, kvListExp,
+        responseSize, false);
+    
+    /**
+     * Test with a big responseSize across multiple rows
+     * The response size is set to only fit one and a half rows.
+     * If partialRow == true, the expected number of RPC calls is 2, and the number of
+     * scan.next() is 4, since we need two scan.next() to finish each RPC call
+     * If partialRow == false, the expected number of RPC calls is 2 and the number of 
+     * scan.next() is 3 since we need 2 scan.next() to exhaust the first RPC call.
+     * 
+     * x x x x x x x x    [1]
+     * <       rpc1
+     * x x x x|x x x x    [2]
+     *        ><
+     * x x x x x x x x    [1]
+     *   rpc2        >
+     */
+    responseSize = rowSize + rowSize / 2 ;
+    // nbRows: the number of rows that responseSize can at most contain
+    //         (including the last partial row)
+    int nbRows = (responseSize + rowSize - 1) / rowSize;
+    rpcCntExp = ROWS.length * rowSize / responseSize;
+    scanCntExp = rpcCntExp * nbRows;
+    testScan(ht, rowSize, kvSize, scanCntExp, rpcCntExp, kvListExp,
+        responseSize, true);
+    
+    /**
+    * x x x x x x x x   [1]
+    * <        rpc1
+    * x x x x x x x x   [1]
+    *               >  
+    * x x x x x x x x   [1]
+    * <     rpc2    >
+    */
+    scanCntExp = ROWS.length;
+    rpcCntExp = (ROWS.length + nbRows - 1) / nbRows;
+    testScan(ht, rowSize, kvSize, scanCntExp, rpcCntExp, kvListExp,
+        responseSize, false);
+  } 
+
+  void testScan(HTable ht, int rowSize, int kvSize,
+      int scanCntExp, int rpcCntExp, List<KeyValue> kvListExp, 
+      int responseSize, boolean partialRow) throws Exception{ 
+    int scanCntAct = 0;
+    boolean toLog  = true; 
+    Scan scan = new Scan();
+    Result result;
+    ResultScanner scanner;
+    List<KeyValue> kvListScan = new ArrayList<KeyValue>();  
+    scan.setCaching(responseSize, partialRow);
+    scanner = ht.getScanner(scan);
+    kvListScan.clear();
+    int rpcCntAct = 0;
+    while ((result = scanner.next()) != null) {
+      scanCntAct++;
+      for (KeyValue kv : result.list()) {
+        kvListScan.add(kv);
+      }
+      if (ht.getProfilingData() != null)
+        rpcCntAct++;
+    }
+    scanner.close();
+    assertEquals(scanCntExp, scanCntAct);
+    assertEquals(rpcCntExp, rpcCntAct);
+    result = new Result(kvListScan);
+    verifyResult(result, kvListExp, toLog, 
+        "Testing scan with responseSize = " + responseSize + 
+        ", partialRow = " + partialRow);
+  }
+  
+  private void verifyResult(Result result, List<KeyValue> kvList, boolean toLog,
+    String msg) {
+    LOG.info(msg);
+    LOG.info("Exp cnt: " + kvList.size());
+    LOG.info("True cnt is: " + result.size());	
+    assertEquals(kvList.size(), result.size());
+
+    if (kvList.size() == 0) return;
+    int i = 0;
+    for (KeyValue kv : result.sorted()) {
+      KeyValue kvExp = kvList.get(i++);
+      if (toLog) {
+        LOG.info("get kv is: " + kv.toString());
+        LOG.info("exp kv is: " + kvExp.toString());	  
+      }
+      assertTrue("Not equal", kvExp.equals(kv));
+    }
+  }
+
+  private byte [][] makeNAscii(byte [] base, int n) {
+    byte [][] ret = new byte[n][];
+    for(int i=0;i<n;i++) {
+      byte [] tail = Bytes.toBytes(Integer.toString(i));
+      ret[i] = Bytes.add(base, tail);
+    }
+    return ret;
+  }
+}

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1407699&r1=1407698&r2=1407699&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
(original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
Sat Nov 10 00:01:42 2012
@@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
+import org.apache.hadoop.hbase.util.HasThread;
 import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
 import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
 import org.apache.hadoop.hbase.util.Pair;
@@ -72,8 +73,6 @@ import org.apache.hadoop.hbase.util.Thre
 
 import com.google.common.collect.Lists;
 
-import org.apache.hadoop.hbase.util.HasThread;
-
 /**
  * Basic stand-alone testing of HRegion.
  *
@@ -1466,6 +1465,96 @@ public class TestHRegion extends HBaseTe
     }
   }
 
+  public void testRegionScannerNextRows() throws IOException {
+    // Setting up region
+    String method = this.getName();
+    byte[] tableName = Bytes.toBytes("testtableNextRows");
+    byte[][] rows = {Bytes.toBytes("row1"), Bytes.toBytes("row2"), 
+                    Bytes.toBytes("rows3")};
+    byte[][] families = { Bytes.toBytes("fam1"), Bytes.toBytes("fam2"),
+        Bytes.toBytes("fam3"), Bytes.toBytes("fam4") };
+    initHRegion(tableName, method, families);
+
+    // Putting data in Region
+    List<KeyValue> expected = new ArrayList<KeyValue>();
+    fillTable(rows, families, 2, expected);
+    /**
+     * in this case we know kv size = 28 
+     * KLEN VLEN ROWLEN ROWNAME CFLEN CFNAME TS TYPE 
+     * --4-|--4-|--2---|---4---|--1--|--4---|-8-|--1-- ===> 28 bytes
+     */
+    Scan scan = new Scan();
+    scan.setMaxVersions(3);
+    scan.addFamily(families[1]);
+    scan.addFamily(families[3]);
+
+    // fetch one kv even when responseSize = 0, oh well, this's the semantic
+    // that users should be aware of  
+    compareNextRows(scan, 0, true, Integer.MAX_VALUE, expected.subList(0, 1));
+    // fetch the last kv pair if the responseSize is not big enough
+    compareNextRows(scan, 1, true, Integer.MAX_VALUE, expected.subList(0, 1));
+    // maxResponseSize perfectly fits one kv 
+    compareNextRows(scan, 28, true, Integer.MAX_VALUE, expected.subList(0, 1));
+
+    // if partialRow == true, fetch as much as  maxResponseSize allows 
+    compareNextRows(scan, 29, true, Integer.MAX_VALUE, expected.subList(0, 2));
+    // if partialRow == false, fetch the entire row  
+    compareNextRows(scan, 29, false, Integer.MAX_VALUE, expected.subList(0, 6));
+    
+    // fetch everything in the table as long as responseSize is big enough
+    compareNextRows(scan, 10000, true, Integer.MAX_VALUE, expected);
+    compareNextRows(scan, 10000, false, Integer.MAX_VALUE, expected);
+   
+    // check nbRows 
+    // fetch two rows, each has two columns and each column has 3 kvs
+    compareNextRows(scan, 10000, true, 2, expected.subList(0, 12));
+    compareNextRows(scan, 10000, false, 2, expected.subList(0, 12));
+  }
+
+  private void fillTable(byte[][] rows, byte[][] families, int nTs,
+      List<KeyValue> expected) throws IOException {
+    Put put = null;
+    long ts = System.currentTimeMillis();
+    long[] timestamps = { ts, ts - 10, ts - 20 };
+    for (byte[] row : rows) {
+      put = new Put(row);
+      for (byte[] cf : families) {
+        for (long t : timestamps) {
+          put.add(cf, null, t, null);
+          if (cf.equals(families[1]) || cf.equals(families[3])) {
+            expected.add(new KeyValue(row, cf, null, t, KeyValue.Type.Put,
+                  null));
+          }
+        }
+      }
+      region.put(put);
+    }
+  }
+
+  private void compareNextRows(Scan scan, int responseSize, boolean partialRow,
+      int nbRows, List<KeyValue> expected)
+      throws IOException {
+    if (nbRows == Integer.MAX_VALUE) {
+      scan.setCaching(responseSize, partialRow);
+    } else {
+      scan.setCaching(nbRows);
+    }
+    RegionScanner rs = (RegionScanner) region.getScanner(scan);
+    List<KeyValue> kvListScan = new ArrayList<KeyValue>();
+    List<Result> results = new ArrayList<Result>();
+    rs.nextRows(results, nbRows, null);
+    for (Result res : results) {
+      for (KeyValue kv : res.list()) {
+        kvListScan.add(kv);
+      }
+    }
+    assertEquals(expected.size(), kvListScan.size());
+    for (int i = 0; i < kvListScan.size(); i++) {
+      assertEquals(expected.get(i), kvListScan.get(i));
+    }
+    rs.close();
+  }
+
   public void testRegionScanner_Next() throws IOException {
     byte [] tableName = Bytes.toBytes("testtable");
     byte [] row1 = Bytes.toBytes("row1");

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java?rev=1407699&r1=1407698&r2=1407699&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java
(original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java
Sat Nov 10 00:01:42 2012
@@ -72,6 +72,7 @@ public class TestHLogFiltering {
   private void fillTable() throws IOException, InterruptedException {
     HTable table = TEST_UTIL.createTable(TABLE_NAME, FAMILIES, 3,
         Bytes.toBytes("row0"), Bytes.toBytes("row99"), NUM_RS);
+    TEST_UTIL.waitUntilAllRegionsAssigned(NUM_RS);
     Random rand = new Random(19387129L);
     for (int iStoreFile = 0; iStoreFile < 4; ++iStoreFile) {
       for (int iRow = 0; iRow < 100; ++iRow) {
@@ -98,7 +99,6 @@ public class TestHLogFiltering {
         table.flushCommits();
       }
     }
-    TEST_UTIL.waitUntilAllRegionsAssigned(NUM_RS);
   }
 
   @Test



Mime
View raw message