hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbau...@apache.org
Subject svn commit: r1401278 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/client/
Date Tue, 23 Oct 2012 12:50:38 GMT
Author: mbautin
Date: Tue Oct 23 12:50:37 2012
New Revision: 1401278

URL: http://svn.apache.org/viewvc?rev=1401278&view=rev
Log:
[HBASE-6770] Reverting  patch

Author: kranganathan

Summary: Reverting patch as we want to address some changes

Test Plan: Not tested, simple revert

Reviewers: kannan

Reviewed By: kannan

CC: cjin, hbase-eng@

Differential Revision: https://phabricator.fb.com/D607531

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestMaxResponseSize.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java?rev=1401278&r1=1401277&r2=1401278&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java Tue Oct
23 12:50:37 2012
@@ -83,10 +83,8 @@ import java.util.TreeSet;
  * execute {@link #setCacheBlocks(boolean)}.
  */
 public class Scan extends Operation implements Writable {
-  private static final int VERSION_STORE_LIMIT = 2;
-  private static final int VERSION_STORE_OFFSET = 3;
-  private static final int VERSION_RESPONSE_SIZE = 4;
-  private static final byte SCAN_VERSION = VERSION_RESPONSE_SIZE;
+  private static final byte SCAN_VERSION = (byte)3;
+
   private byte [] startRow = HConstants.EMPTY_START_ROW;
   private byte [] stopRow  = HConstants.EMPTY_END_ROW;
   private int maxVersions = 1;
@@ -94,9 +92,6 @@ public class Scan extends Operation impl
   private int storeLimit = -1;
   private int storeOffset = 0;
   private int caching = -1;
-  private long maxResponseSize = -1;
-  private long currentPartialResponseSize = 0;
-  private boolean partialRow = false;
   private boolean cacheBlocks = true;
   private Filter filter = null;
   private TimeRange tr = new TimeRange();
@@ -334,25 +329,6 @@ public class Scan extends Operation impl
   }
 
   /**
-   * This is technically not the max available memory setting, more of a hint. 
-   * We will add KV's till we exceed this setting if partialRow is true, 
-   * and add entire rows till we exceed this setting if partialRow is false.
-   */
-  public void setResponseSetting(long responseSize, boolean partialRow) {
-    this.maxResponseSize = responseSize;
-    this.partialRow = partialRow;
-  }
-  
-  /**
-   * Set currentPartialResponseSize to accumulated response size 
-   * for all the KeyValue pairs collected so far
-   * @param responseSize
-   */
-  public void setCurrentPartialResponseSize(long responseSize) {
-    this.currentPartialResponseSize = responseSize;
-  }
-  
-  /**
    * Apply the specified server-side filter when performing the Scan.
    * @param filter filter to run on the server
    * @return this
@@ -436,24 +412,6 @@ public class Scan extends Operation impl
   }
 
   /**
-   * @return maximum response size that client can handle for a single call to next()
-   */
-  public long getMaxResponseSize() {
-    return this.maxResponseSize;
-  }
-  
-  public long getCurrentPartialResponseSize() {
-    return this.currentPartialResponseSize;
-  }
-  
-  /**
-   * @return whether the last row can be partially transferred for a single call to next()
-   */
-  public boolean getPartialRow() {
-      return this.partialRow;
-  }
-  
-  /**
    * @return maximum number of values to return per row per CF
    */
   public int getMaxResultsPerColumnFamily() {
@@ -565,8 +523,6 @@ public class Scan extends Operation impl
     map.put("caching", this.caching);
     map.put("cacheBlocks", this.cacheBlocks);
     map.put("storeLimit", this.storeLimit);
-    map.put("maxResponseSize", this.maxResponseSize);
-    map.put("partialRow", this.partialRow);
     List<Long> timeRange = new ArrayList<Long>();
     timeRange.add(this.tr.getMin());
     timeRange.add(this.tr.getMax());
@@ -623,16 +579,12 @@ public class Scan extends Operation impl
     this.stopRow = Bytes.readByteArray(in);
     this.maxVersions = in.readInt();
     this.batch = in.readInt();
-    if (version >= VERSION_STORE_LIMIT) {
+    if (version > 1) {
       this.storeLimit = in.readInt();
     }
-    if (version >= VERSION_STORE_OFFSET) {
+    if (version > 2) {
       this.storeOffset = in.readInt();
     }
-    if (version >= VERSION_RESPONSE_SIZE) {
-      this.maxResponseSize = in.readLong();
-      this.partialRow = in.readBoolean();
-    }
     this.caching = in.readInt();
     this.cacheBlocks = in.readBoolean();
     if(in.readBoolean()) {
@@ -659,30 +611,22 @@ public class Scan extends Operation impl
   public void write(final DataOutput out)
   throws IOException {
     byte version = (byte)1;
-    if (this.storeLimit != -1) {
-      version = (byte)Math.max(version, VERSION_STORE_LIMIT);
-    }
     if (this.storeOffset != 0) {
-      version = (byte)Math.max(version, VERSION_STORE_OFFSET);
-    }
-    if (this.maxResponseSize != -1) {
-      version = (byte)Math.max(version, VERSION_RESPONSE_SIZE);
+      version = SCAN_VERSION;
+    } else if (this.storeLimit != -1) {
+      version = 2;
     }
     out.writeByte(version);
     Bytes.writeByteArray(out, this.startRow);
     Bytes.writeByteArray(out, this.stopRow);
     out.writeInt(this.maxVersions);
     out.writeInt(this.batch);
-    if (version >= VERSION_STORE_LIMIT) {
+    if (version > 1) {
       out.writeInt(this.storeLimit);
     }
-    if (version >= VERSION_STORE_OFFSET) {
+    if (version > 2) {
       out.writeInt(this.storeOffset);
     }
-    if (version >= VERSION_RESPONSE_SIZE) {
-      out.writeLong(this.maxResponseSize);
-      out.writeBoolean(this.partialRow);
-    }
     out.writeInt(this.caching);
     out.writeBoolean(this.cacheBlocks);
     if(this.filter == null) {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1401278&r1=1401277&r2=1401278&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Tue Oct 23 12:50:37 2012
@@ -150,8 +150,7 @@ public class HRegion implements HeapSize
   public static final Log LOG = LogFactory.getLog(HRegion.class);
   static final String SPLITDIR = "splits";
   static final String MERGEDIR = "merges";
-  private final long maxScannerResultSize;
-  
+
   static SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
   final AtomicBoolean closed = new AtomicBoolean(false);
   /* Closing can take some time; use the closing flag if there is stuff we don't
@@ -441,8 +440,6 @@ public class HRegion implements HeapSize
     this.threadWakeFrequency = 0L;
     this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
     this.openDate = 0;
-    this.maxScannerResultSize = 
-        HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE; 
   }
 
   /**
@@ -452,7 +449,7 @@ public class HRegion implements HeapSize
    */
   public HRegion(HRegion other) {
     this(other.getTableDir(), other.getLog(), other.getFilesystem(),
-      other.baseConf, other.getRegionInfo(), null);
+        other.baseConf, other.getRegionInfo(), null);
   }
 
   /**
@@ -530,10 +527,6 @@ public class HRegion implements HeapSize
         conf.getBoolean(HConstants.HREGION_MEMSTORE_WAIT_ON_BLOCK, true);
     this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
 
-    this.maxScannerResultSize = conf.getLong(
-            HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
-            HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
-
     this.readRequests =new RequestMetrics();
     this.writeRequests =new RequestMetrics();
   }
@@ -2911,7 +2904,7 @@ public class HRegion implements HeapSize
       lockedRows.notifyAll();
     }
   }
-  
+
   /**
    * See if row is currently locked.
    * @param lockid
@@ -3033,6 +3026,7 @@ public class HRegion implements HeapSize
       //DebugPrint.println("HRegionScanner.<init>");
 
       this.originalScan = scan;
+
       this.filter = scan.getFilter();
       this.batch = scan.getBatch();
       if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
@@ -3083,104 +3077,41 @@ public class HRegion implements HeapSize
         throws IOException {
       return next(outResults, limit, null);
     }
-    
-    /**
-     * A method to return all the rows that can fit in the response size.
-     * it respects the two stop conditions:
-     * 1) scan.getMaxResponseSize
-     * 2) scan.getCaching()
-     * the loop breaks whoever comes first. 
-     * @param outResults returns a list of rows to nextRows()
-     * @param outKeyValues returns a list of keyvalues to get()
-     * @return true if there are more rows to fetch. 
-     */
-    private boolean nextCombine(List<Result> outResults, List<KeyValue> outKeyValues,
-        int nbRows, String metric) throws IOException {
-      if ((outResults == null) == (outKeyValues == null)) {
-       throw new AssertionError("Exactly one of outResults and outKeyValues "
-           + "must be null: outResultsIsNull=" + (outResults == null) + ", "
-           + "outKeyValuesIsNull=" + (outKeyValues == null));
-      }
 
+    @Override
+    public synchronized boolean next(List<KeyValue> outResults, int limit,
+        String metric) throws IOException {
       readRequests.incrTotalRequstCount();
       if (this.filterClosed) {
-        throw new UnknownScannerException("Scanner was closed (timed out?) "
-            + "after we renewed it. Could be caused by a very slow scanner "
-            + "or a lengthy garbage collection");
+        throw new UnknownScannerException("Scanner was closed (timed out?) " +
+            "after we renewed it. Could be caused by a very slow scanner " +
+            "or a lengthy garbage collection");
       }
       if (closing.get() || closed.get()) {
         close();
-        throw new NotServingRegionException(regionInfo.getRegionNameAsString()
-            + " is closing=" + closing.get() + " or closed=" + closed.get());
+        throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
+          " is closing=" + closing.get() + " or closed=" + closed.get());
       }
+
       // This could be a new thread from the last time we called next().
       MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
-      List<KeyValue> tmpList = (outKeyValues != null && outKeyValues.isEmpty())
? 
-                  outKeyValues: new ArrayList<KeyValue>();
-      
-      int limit = this.getOriginalScan().getBatch();
-      int currentNbRows = 0;
-      long currentScanResultSize = 0;
-      boolean returnResult = true;
-      // This is necessary b/c partialResponseSize is not serialized through RPC
-      getOriginalScan().setCurrentPartialResponseSize(0);
-      long maxResponseSize = getOriginalScan().getMaxResponseSize();
-      while (true) {
+
+      boolean returnResult;
+      if (outResults.isEmpty()) {
+         // Usually outResults is empty. This is true when next is called
+         // to handle scan or get operation.
+        returnResult = nextInternal(outResults, limit, metric);
+      } else {
+        List<KeyValue> tmpList = new ArrayList<KeyValue>();
         returnResult = nextInternal(tmpList, limit, metric);
-        if (!tmpList.isEmpty()) {
-          if (maxScannerResultSize < Long.MAX_VALUE) {
-            for (KeyValue kv : tmpList) {
-              currentScanResultSize += kv.heapSize();
-            }
-          }
-          if (outResults != null) {
-            outResults.add(new Result(tmpList)); 
-            tmpList.clear();
-          } else if (tmpList != outKeyValues) {
-            outKeyValues.addAll(tmpList); 
-          }
-        }
-        
-        resetFilters();
-        if (isFilterDone()) {
-          return false;
-        }
-        
-        if (!returnResult) {
-          return false;
-        }
-        // if response size hits the limit, break the loop
-        if (getOriginalScan().getCurrentPartialResponseSize() >= maxResponseSize) {
-          return returnResult;
-        }
-        // if the size of all the keyvalue pairs exceeds maxScannerResultSize,
-        // or in the case just fetch nbRows
-        if (currentScanResultSize >= maxScannerResultSize || 
-            (maxResponseSize == -1 && ++currentNbRows >= nbRows)) {
-          return returnResult;
-        }
+        outResults.addAll(tmpList);
       }
-    }
-    
-    /**
-     * A method to return all the rows that can fit in the response size.  
-     * @param limit a variable that specifies the number of keyvalue pairs can be 
-     * returned per row
-     * @param limit limit on row count to get
-     * @param maxScannerResultSize a variable that specifies the maximum response size
-     *  for all the scan/get ops
-     * @return a boolean that indicates whether scan.next reaches the end.
-     * @return true if there are more rows, false if all scanners are done
-     */
-    public synchronized boolean nextRows(List<Result> outResults, int nbRows,
-        String metric) throws IOException {
-      return nextCombine(outResults, null, nbRows, metric);
-    }
-    
-    @Override
-    public synchronized boolean next(List<KeyValue> outKeyValues, int nbRows,
-        String metric) throws IOException {
-      return nextCombine(null, outKeyValues, nbRows, metric);
+
+      resetFilters();
+      if (isFilterDone()) {
+        return false;
+      }
+      return returnResult;
     }
 
     @Override
@@ -3213,10 +3144,7 @@ public class HRegion implements HeapSize
       if (!results.isEmpty()) {
         throw new IllegalArgumentException("First parameter should be an empty list");
       }
-      boolean partialRow = getOriginalScan().getPartialRow();
-      long maxResponseSize = getOriginalScan().getMaxResponseSize();
-      maxResponseSize = Math.min(maxScannerResultSize, maxResponseSize);
-      
+
       while (true) {
         byte [] currentRow = peekRow();
         if (isStopRow(currentRow)) {
@@ -3240,13 +3168,6 @@ public class HRegion implements HeapSize
                   "Filter with filterRow(List<KeyValue>) incompatible with scan with
limit!");
               return true; // we are expecting more yes, but also limited to how many we
can return.
             }
-            // if partialRow == false, it will fetch the entire row
-            // if the response size is filled up, return true 
-            if (maxResponseSize != -1 && 
-                (partialRow && getOriginalScan().getCurrentPartialResponseSize()
-                    >= maxResponseSize)){
-              return true;
-            }
           } while (Bytes.equals(currentRow, nextRow = peekRow()));
 
           final boolean stopRow = isStopRow(nextRow);
@@ -4025,6 +3946,7 @@ public class HRegion implements HeapSize
     return result;
   }
 
+
   //
   // New HBASE-880 Helpers
   //
@@ -4040,7 +3962,7 @@ public class HRegion implements HeapSize
 
   public static final long FIXED_OVERHEAD = ClassSize.align(
       (2 * Bytes.SIZEOF_BOOLEAN) +
-      (7 * Bytes.SIZEOF_LONG) + 2 * ClassSize.ARRAY +
+      (6 * Bytes.SIZEOF_LONG) + 2 * ClassSize.ARRAY +
       (28 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT);
 
   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1401278&r1=1401277&r2=1401278&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Tue Oct 23 12:50:37 2012
@@ -228,6 +228,8 @@ public class HRegionServer implements HR
 
   protected final int numRegionsToReport;
 
+  private final long maxScannerResultSize;
+
   // Remote HMaster
   private HMasterRegionInterface hbaseMaster;
 
@@ -389,6 +391,10 @@ public class HRegionServer implements HR
 
     sleeper = new Sleeper(this.msgInterval, this);
 
+    this.maxScannerResultSize = conf.getLong(
+            HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
+            HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
+
     // Task thread to process requests from Master
     this.worker = new Worker();
 
@@ -1296,10 +1302,10 @@ public class HRegionServer implements HR
   protected void metrics() {
     int numReads = this.numReads.get();
     this.numReads.addAndGet(-numReads);
-    
+
     int numWrites = this.numWrites.get();
     this.numWrites.addAndGet(-numWrites);
-    
+
     this.metrics.regions.set(this.onlineRegions.size());
     this.metrics.incrementRequests(numReads + numWrites);
     this.metrics.numReads.inc(numReads);
@@ -2531,9 +2537,7 @@ public class HRegionServer implements HR
   public Result [] next(final long scannerId, int nbRows) throws IOException {
     try {
       String scannerName = String.valueOf(scannerId);
-      // HRegionServer only deals with Region Scanner, 
-      // thus, we just typecast directly
-      HRegion.RegionScanner s = (HRegion.RegionScanner)this.scanners.get(scannerName);
+      InternalScanner s = this.scanners.get(scannerName);
       if (s == null) {
         throw new UnknownScannerException("Name: " + scannerName);
       }
@@ -2547,13 +2551,34 @@ public class HRegionServer implements HR
       }
       this.leases.renewLease(scannerName);
       List<Result> results = new ArrayList<Result>(nbRows);
-      s.nextRows(results, nbRows, HRegion.METRIC_NEXTSIZE);
-      numReads.addAndGet(results.size());
-      // IF its filter if any is done with the scan
+      long currentScanResultSize = 0;
+      List<KeyValue> values = new ArrayList<KeyValue>();
+      int i = 0;
+      for (; i < nbRows && currentScanResultSize < maxScannerResultSize; i++)
{
+        // Collect values to be returned here
+        boolean moreRows = s.next(values, HRegion.METRIC_NEXTSIZE);
+        if (!values.isEmpty()) {
+          if (maxScannerResultSize < Long.MAX_VALUE){
+            for (KeyValue kv : values) {
+              currentScanResultSize += kv.heapSize();
+            }
+          }
+          results.add(new Result(values));
+        }
+        if (!moreRows) {
+          break;
+        }
+        values.clear();
+      }
+      numReads.addAndGet(i);
+      // Below is an ugly hack where we cast the InternalScanner to be a
+      // HRegion.RegionScanner.  The alternative is to change InternalScanner
+      // interface but its used everywhere whereas we just need a bit of info
+      // from HRegion.RegionScanner, IF its filter if any is done with the scan
       // and wants to tell the client to stop the scan.  This is done by passing
       // a null result.
-      return s.isFilterDone() && results.isEmpty() ? null :
-        results.toArray(new Result[0]);
+      return ((HRegion.RegionScanner)s).isFilterDone() && results.isEmpty()?
+        null: results.toArray(new Result[0]);
     } catch (Throwable t) {
       if (t instanceof NotServingRegionException) {
         String scannerName = String.valueOf(scannerId);

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1401278&r1=1401277&r2=1401278&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
(original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
Tue Oct 23 12:50:37 2012
@@ -348,11 +348,6 @@ public class StoreScanner extends NonLaz
         store != null ? store.getComparator() : null;
 
     long addedResultsSize = 0;
-    // set the responseSize so that it now can fetch records
-    // in terms of keyvalue's boundary rather than row's boundary
-    boolean partialRow = scan.getPartialRow();
-    long maxResponseSize = scan.getMaxResponseSize();
-    long partialResponseSize = scan.getCurrentPartialResponseSize();
     try {
       LOOP: while((kv = this.heap.peek()) != null) {
         // kv is no longer immutable due to KeyOnlyFilter! use copy for safety
@@ -403,12 +398,6 @@ public class StoreScanner extends NonLaz
                     + " Cannot allow  operations that fetch more than "
                     + HRegionServer.getResponseSizeLimit() + " bytes.");
                 throw new DoNotRetryIOException("Result too large");
-              } else if (maxResponseSize != -1) {
-                // If the user is strict about the response size and allows
-                // partialRow scanning, we only return the number of keyvalue 
-                // pairs to fill up the request size. Otherwise when partialRow 
-                // is false, we just fetch the entire row
-                partialResponseSize += copyKv.getLength();
               }
               outResult.add(copyKv);
               numNewKeyValues++;
@@ -425,9 +414,7 @@ public class StoreScanner extends NonLaz
             } else {
               this.heap.next();
             }
-            if (maxResponseSize != -1 && partialResponseSize >= maxResponseSize)
{
-              break LOOP;
-            }
+
             if (limit > 0 && (numNewKeyValues == limit)) {
               break LOOP;
             }
@@ -499,13 +486,8 @@ public class StoreScanner extends NonLaz
 
       throw e;
 
-    } finally { 
-      // update the remaining response size
-      if (maxResponseSize != -1) {
-        partialResponseSize = Math.min(maxResponseSize, partialResponseSize);
-        scan.setCurrentPartialResponseSize(partialResponseSize);
-      }
-      // update the counter 
+    } finally {
+      // update the counter
       if (addedResultsSize > 0 && metric != null) {
         HRegion.incrNumericMetric(this.metricNamePrefix + metric,
             addedResultsSize);

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestMaxResponseSize.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestMaxResponseSize.java?rev=1401278&r1=1401277&r2=1401278&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestMaxResponseSize.java
(original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestMaxResponseSize.java
Tue Oct 23 12:50:37 2012
@@ -1,213 +0,0 @@
-/**
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.junit.Assert.*;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.UUID;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueTestUtil;
-import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
-import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
-import org.apache.hadoop.hbase.filter.PrefixFilter;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-
-
-public class TestMaxResponseSize{
-  final Log LOG = LogFactory.getLog(getClass());
-  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-  private static byte [] ROW = Bytes.toBytes("testRow");
-  private static byte [] FAMILY = Bytes.toBytes("testFamily");
-  private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
-  private static byte [] VALUE = Bytes.toBytes("testValue");
-  private static int SLAVES = 3;
-  /**
-   * @throws java.lang.Exception
-   */
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    TEST_UTIL.startMiniCluster(SLAVES);
-  }
-
-  /**
-   * @throws java.lang.Exception
-   */
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    TEST_UTIL.shutdownMiniCluster();
-  }
-
-  /**
-   * @throws java.lang.Exception
-   */
-  @Before
-  public void setUp() throws Exception {
-    // Nothing to do.
-  }
-
-  /**
-   * @throws java.lang.Exception
-   */
-  @After
-  public void tearDown() throws Exception {
-    // Nothing to do.
-  }
-
-  public void createTestTable() throws Exception {
-  }
-  /**
-   * Test from client side for scan with responseSize and partialRow
-   * responseSize is small value
-   * @throws Exception
-   */
-  
-  @Test
-  public void testScanMaxRequstSize() throws Exception {    
-    byte [] TABLE = Bytes.toBytes("testScanMaxRequstSize");
-    byte [][] ROWS= makeNAscii(ROW, 3);
-    byte [][] FAMILIES = makeNAscii(FAMILY, 3);
-    byte [][] QUALIFIERS = makeNAscii(QUALIFIER, 10);
-
-    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES);
-    List<KeyValue> kvListExp = new ArrayList<KeyValue>();  
-
-    Put put;
-    long kvSize = (new KeyValue(ROWS[0], FAMILIES[0], QUALIFIERS[0], 1, VALUE))
-      .getLength();
-    long rowSize = (kvSize * (FAMILIES.length * QUALIFIERS.length));
-    
-    for (int r=0; r < ROWS.length; r++) {
-      put = new Put(ROWS[r]);
-      for (int c=0; c < FAMILIES.length; c++) { 
-        for (int q=0; q < QUALIFIERS.length; q++) { 
-          KeyValue kv = new KeyValue(ROWS[r], FAMILIES[c], QUALIFIERS[q], 1, VALUE);
-          put.add(kv);
-          kvListExp.add(kv);
-        }
-      }
-      ht.put(put);
-    }
-    
-    /**
-     * Test with the small bufferSize that is smaller rowSize 
-     * The response size is set to only fit half a row.
-     * if partialRow == true, the expected number of fetches is 6 in order to 
-     * retrieve all the 3 rows, otherwise we can fetch an entire row each time,
-     * which makes the expected number to be 3.
-     */
-    long responseSize = rowSize / 2;
-    long scanCntExp = ((rowSize + responseSize - 1) / responseSize) * ROWS.length;
-    testScan(ht, rowSize, kvSize, scanCntExp, kvListExp, responseSize, true);
-    
-    scanCntExp = ROWS.length;
-    testScan(ht, rowSize, kvSize, scanCntExp, kvListExp, responseSize, false);
-    
-    /**
-     * Test with a big responseSize across mutliple rows
-     * The response size is set to only fit one and a half rows.
-     * if partialRow == true, the expected number of fetches is 4 
-     * in order to retrieve all the 3 rows. 
-     * if partialRow == false, the exptect number of fetch is 3 since each time 
-     * it still fetches entire row at a time.
-     */
-    responseSize = rowSize + rowSize / 2 ;
-    long nbRows = (responseSize + rowSize - 1)/ rowSize; 
-    scanCntExp = (ROWS.length + nbRows - 1) / nbRows * nbRows;
-    testScan(ht, rowSize, kvSize, scanCntExp, kvListExp, responseSize, true);
-    
-    scanCntExp = ROWS.length;
-    testScan(ht, rowSize, kvSize, scanCntExp, kvListExp, responseSize, false);
-
-  } 
-
-  void testScan(HTable ht, long rowSize, long kvSize,
-      long scanCntExp, List<KeyValue> kvListExp, 
-      long responseSize, boolean partialRow) throws Exception{ 
-    long availResponseSize = responseSize;
-    long kvNumPerRow = rowSize/kvSize;
-    int scanCntAct = 0;
-    boolean toLog  = true; 
-    Scan scan = new Scan();
-    Result result;
-    ResultScanner scanner;
-    List<KeyValue> kvListScan = new ArrayList<KeyValue>();  
-    scan.setResponseSetting(responseSize, partialRow);
-    scanner = ht.getScanner(scan);
-    kvListScan.clear();
-    while ((result = scanner.next()) != null) {
-      scanCntAct++;
-      for (KeyValue kv : result.list()) {
-        kvListScan.add(kv);
-      }
-    }
-    
-    System.out.println("total number of scans: " + scanCntAct + ", "
-        + scanCntExp+ ","+responseSize+","+partialRow);
-    assertEquals(scanCntExp, scanCntAct);
-    result = new Result(kvListScan);
-    verifyResult(result, kvListExp, toLog, 
-        "Testing scan with responseSize = " + responseSize + 
-        ", partialRow = " + partialRow);
-  }
-  
-  private void verifyResult(Result result, List<KeyValue> kvList, boolean toLog,
-    String msg) {
-    LOG.info(msg);
-    LOG.info("Exp cnt: " + kvList.size());
-    LOG.info("True cnt is: " + result.size());	
-    assertEquals(kvList.size(), result.size());
-
-    if (kvList.size() == 0) return;
-    int i = 0;
-    for (KeyValue kv : result.sorted()) {
-      KeyValue kvExp = kvList.get(i++);
-      if (toLog) {
-        LOG.info("get kv is: " + kv.toString());
-        LOG.info("exp kv is: " + kvExp.toString());	  
-      }
-      assertTrue("Not equal", kvExp.equals(kv));
-    }
-
-  }
-
-  private byte [][] makeNAscii(byte [] base, int n) {
-    byte [][] ret = new byte[n][];
-    for(int i=0;i<n;i++) {
-      byte [] tail = Bytes.toBytes(Integer.toString(i));
-      ret[i] = Bytes.add(base, tail);
-    }
-    return ret;
-  }
-  
-}



Mime
View raw message