Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 00945D22D for ; Sat, 20 Oct 2012 12:53:02 +0000 (UTC) Received: (qmail 55174 invoked by uid 500); 20 Oct 2012 12:53:01 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 54925 invoked by uid 500); 20 Oct 2012 12:52:59 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 54890 invoked by uid 99); 20 Oct 2012 12:52:58 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 20 Oct 2012 12:52:58 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 20 Oct 2012 12:52:52 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 9F16623888EA for ; Sat, 20 Oct 2012 12:52:07 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1400430 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/client/ Date: Sat, 20 Oct 2012 12:52:07 -0000 To: commits@hbase.apache.org From: mbautin@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121020125207.9F16623888EA@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: mbautin Date: Sat Oct 20 12:52:06 2012 New Revision: 1400430 URL: http://svn.apache.org/viewvc?rev=1400430&view=rev Log: JIRA HBASE-6770: Allow scanner to specify the response size Author: cjin Summary: Allow scanner to specify the request size rather than set both batch and caching to control the response size, and it can also specify whether the row can be truncated or not. Test Plan: unitTest and add TestFromClientSide4.java to test the new fields (maxRequestSize, partialRow) Reviewers: aaiyer, kranganathan, mbautin Reviewed By: mbautin CC: hbase-eng@ Differential Revision: https://phabricator.fb.com/D594373 Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestMaxResponseSize.java Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java?rev=1400430&r1=1400429&r2=1400430&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java Sat Oct 20 12:52:06 2012 @@ -83,8 +83,10 @@ import java.util.TreeSet; * execute {@link #setCacheBlocks(boolean)}. */ public class Scan extends Operation implements Writable { - private static final byte SCAN_VERSION = (byte)3; - + private static final int VERSION_STORE_LIMIT = 2; + private static final int VERSION_STORE_OFFSET = 3; + private static final int VERSION_RESPONSE_SIZE = 4; + private static final byte SCAN_VERSION = VERSION_RESPONSE_SIZE; private byte [] startRow = HConstants.EMPTY_START_ROW; private byte [] stopRow = HConstants.EMPTY_END_ROW; private int maxVersions = 1; @@ -92,6 +94,9 @@ public class Scan extends Operation impl private int storeLimit = -1; private int storeOffset = 0; private int caching = -1; + private long maxResponseSize = -1; + private long currentPartialResponseSize = 0; + private boolean partialRow = false; private boolean cacheBlocks = true; private Filter filter = null; private TimeRange tr = new TimeRange(); @@ -329,6 +334,25 @@ public class Scan extends Operation impl } /** + * This is technically not the max available memory setting, more of a hint. + * We will add KV's till we exceed this setting if partialRow is true, + * and add entire rows till we exceed this setting if partialRow is false. + */ + public void setResponseSetting(long responseSize, boolean partialRow) { + this.maxResponseSize = responseSize; + this.partialRow = partialRow; + } + + /** + * Set currentPartialResponseSize to accumulated response size + * for all the KeyValue pairs collected so far + * @param responseSize + */ + public void setCurrentPartialResponseSize(long responseSize) { + this.currentPartialResponseSize = responseSize; + } + + /** * Apply the specified server-side filter when performing the Scan. * @param filter filter to run on the server * @return this @@ -412,6 +436,24 @@ public class Scan extends Operation impl } /** + * @return maximum response size that client can handle for a single call to next() + */ + public long getMaxResponseSize() { + return this.maxResponseSize; + } + + public long getCurrentPartialResponseSize() { + return this.currentPartialResponseSize; + } + + /** + * @return whether the last row can be partially transferred for a single call to next() + */ + public boolean getPartialRow() { + return this.partialRow; + } + + /** * @return maximum number of values to return per row per CF */ public int getMaxResultsPerColumnFamily() { @@ -523,6 +565,8 @@ public class Scan extends Operation impl map.put("caching", this.caching); map.put("cacheBlocks", this.cacheBlocks); map.put("storeLimit", this.storeLimit); + map.put("maxResponseSize", this.maxResponseSize); + map.put("partialRow", this.partialRow); List timeRange = new ArrayList(); timeRange.add(this.tr.getMin()); timeRange.add(this.tr.getMax()); @@ -579,12 +623,16 @@ public class Scan extends Operation impl this.stopRow = Bytes.readByteArray(in); this.maxVersions = in.readInt(); this.batch = in.readInt(); - if (version > 1) { + if (version >= VERSION_STORE_LIMIT) { this.storeLimit = in.readInt(); } - if (version > 2) { + if (version >= VERSION_STORE_OFFSET) { this.storeOffset = in.readInt(); } + if (version >= VERSION_RESPONSE_SIZE) { + this.maxResponseSize = in.readLong(); + this.partialRow = in.readBoolean(); + } this.caching = in.readInt(); this.cacheBlocks = in.readBoolean(); if(in.readBoolean()) { @@ -611,22 +659,30 @@ public class Scan extends Operation impl public void write(final DataOutput out) throws IOException { byte version = (byte)1; + if (this.storeLimit != -1) { + version = (byte)Math.max(version, VERSION_STORE_LIMIT); + } if (this.storeOffset != 0) { - version = SCAN_VERSION; - } else if (this.storeLimit != -1) { - version = 2; + version = (byte)Math.max(version, VERSION_STORE_OFFSET); + } + if (this.maxResponseSize != -1) { + version = (byte)Math.max(version, VERSION_RESPONSE_SIZE); } out.writeByte(version); Bytes.writeByteArray(out, this.startRow); Bytes.writeByteArray(out, this.stopRow); out.writeInt(this.maxVersions); out.writeInt(this.batch); - if (version > 1) { + if (version >= VERSION_STORE_LIMIT) { out.writeInt(this.storeLimit); } - if (version > 2) { + if (version >= VERSION_STORE_OFFSET) { out.writeInt(this.storeOffset); } + if (version >= VERSION_RESPONSE_SIZE) { + out.writeLong(this.maxResponseSize); + out.writeBoolean(this.partialRow); + } out.writeInt(this.caching); out.writeBoolean(this.cacheBlocks); if(this.filter == null) { Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1400430&r1=1400429&r2=1400430&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sat Oct 20 12:52:06 2012 @@ -150,7 +150,8 @@ public class HRegion implements HeapSize public static final Log LOG = LogFactory.getLog(HRegion.class); static final String SPLITDIR = "splits"; static final String MERGEDIR = "merges"; - + private final long maxScannerResultSize; + static SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); final AtomicBoolean closed = new AtomicBoolean(false); /* Closing can take some time; use the closing flag if there is stuff we don't @@ -440,6 +441,8 @@ public class HRegion implements HeapSize this.threadWakeFrequency = 0L; this.scannerReadPoints = new ConcurrentHashMap(); this.openDate = 0; + this.maxScannerResultSize = + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE; } /** @@ -449,7 +452,7 @@ public class HRegion implements HeapSize */ public HRegion(HRegion other) { this(other.getTableDir(), other.getLog(), other.getFilesystem(), - other.baseConf, other.getRegionInfo(), null); + other.baseConf, other.getRegionInfo(), null); } /** @@ -527,6 +530,10 @@ public class HRegion implements HeapSize conf.getBoolean(HConstants.HREGION_MEMSTORE_WAIT_ON_BLOCK, true); this.scannerReadPoints = new ConcurrentHashMap(); + this.maxScannerResultSize = conf.getLong( + HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); + this.readRequests =new RequestMetrics(); this.writeRequests =new RequestMetrics(); } @@ -2904,7 +2911,7 @@ public class HRegion implements HeapSize lockedRows.notifyAll(); } } - + /** * See if row is currently locked. * @param lockid @@ -3026,7 +3033,6 @@ public class HRegion implements HeapSize //DebugPrint.println("HRegionScanner."); this.originalScan = scan; - this.filter = scan.getFilter(); this.batch = scan.getBatch(); if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { @@ -3077,41 +3083,104 @@ public class HRegion implements HeapSize throws IOException { return next(outResults, limit, null); } + + /** + * A method to return all the rows that can fit in the response size. + * it respects the two stop conditions: + * 1) scan.getMaxResponseSize + * 2) scan.getCaching() + * the loop breaks whoever comes first. + * @param outResults returns a list of rows to nextRows() + * @param outKeyValues returns a list of keyvalues to get() + * @return true if there are more rows to fetch. + */ + private boolean nextCombine(List outResults, List outKeyValues, + int nbRows, String metric) throws IOException { + if ((outResults == null) == (outKeyValues == null)) { + throw new AssertionError("Exactly one of outResults and outKeyValues " + + "must be null: outResultsIsNull=" + (outResults == null) + ", " + + "outKeyValuesIsNull=" + (outKeyValues == null)); + } - @Override - public synchronized boolean next(List outResults, int limit, - String metric) throws IOException { readRequests.incrTotalRequstCount(); if (this.filterClosed) { - throw new UnknownScannerException("Scanner was closed (timed out?) " + - "after we renewed it. Could be caused by a very slow scanner " + - "or a lengthy garbage collection"); + throw new UnknownScannerException("Scanner was closed (timed out?) " + + "after we renewed it. Could be caused by a very slow scanner " + + "or a lengthy garbage collection"); } if (closing.get() || closed.get()) { close(); - throw new NotServingRegionException(regionInfo.getRegionNameAsString() + - " is closing=" + closing.get() + " or closed=" + closed.get()); + throw new NotServingRegionException(regionInfo.getRegionNameAsString() + + " is closing=" + closing.get() + " or closed=" + closed.get()); } - // This could be a new thread from the last time we called next(). MultiVersionConsistencyControl.setThreadReadPoint(this.readPt); - - boolean returnResult; - if (outResults.isEmpty()) { - // Usually outResults is empty. This is true when next is called - // to handle scan or get operation. - returnResult = nextInternal(outResults, limit, metric); - } else { - List tmpList = new ArrayList(); + List tmpList = (outKeyValues != null && outKeyValues.isEmpty()) ? + outKeyValues: new ArrayList(); + + int limit = this.getOriginalScan().getBatch(); + int currentNbRows = 0; + long currentScanResultSize = 0; + boolean returnResult = true; + // This is necessary b/c partialResponseSize is not serialized through RPC + getOriginalScan().setCurrentPartialResponseSize(0); + long maxResponseSize = getOriginalScan().getMaxResponseSize(); + while (true) { returnResult = nextInternal(tmpList, limit, metric); - outResults.addAll(tmpList); - } - - resetFilters(); - if (isFilterDone()) { - return false; + if (!tmpList.isEmpty()) { + if (maxScannerResultSize < Long.MAX_VALUE) { + for (KeyValue kv : tmpList) { + currentScanResultSize += kv.heapSize(); + } + } + if (outResults != null) { + outResults.add(new Result(tmpList)); + tmpList.clear(); + } else if (tmpList != outKeyValues) { + outKeyValues.addAll(tmpList); + } + } + + resetFilters(); + if (isFilterDone()) { + return false; + } + + if (!returnResult) { + return false; + } + // if response size hits the limit, break the loop + if (getOriginalScan().getCurrentPartialResponseSize() >= maxResponseSize) { + return returnResult; + } + // if the size of all the keyvalue pairs exceeds maxScannerResultSize, + // or in the case just fetch nbRows + if (currentScanResultSize >= maxScannerResultSize || + (maxResponseSize == -1 && ++currentNbRows >= nbRows)) { + return returnResult; + } } - return returnResult; + } + + /** + * A method to return all the rows that can fit in the response size. + * @param limit a variable that specifies the number of keyvalue pairs can be + * returned per row + * @param limit limit on row count to get + * @param maxScannerResultSize a variable that specifies the maximum response size + * for all the scan/get ops + * @return a boolean that indicates whether scan.next reaches the end. + * @return true if there are more rows, false if all scanners are done + */ + public synchronized boolean nextRows(List outResults, int nbRows, + String metric) throws IOException { + return nextCombine(outResults, null, nbRows, metric); + } + + @Override + public synchronized boolean next(List outKeyValues, int nbRows, + String metric) throws IOException { + return nextCombine(null, outKeyValues, nbRows, metric); } @Override @@ -3144,7 +3213,10 @@ public class HRegion implements HeapSize if (!results.isEmpty()) { throw new IllegalArgumentException("First parameter should be an empty list"); } - + boolean partialRow = getOriginalScan().getPartialRow(); + long maxResponseSize = getOriginalScan().getMaxResponseSize(); + maxResponseSize = Math.min(maxScannerResultSize, maxResponseSize); + while (true) { byte [] currentRow = peekRow(); if (isStopRow(currentRow)) { @@ -3168,6 +3240,13 @@ public class HRegion implements HeapSize "Filter with filterRow(List) incompatible with scan with limit!"); return true; // we are expecting more yes, but also limited to how many we can return. } + // if partialRow == false, it will fetch the entire row + // if the response size is filled up, return true + if (maxResponseSize != -1 && + (partialRow && getOriginalScan().getCurrentPartialResponseSize() + >= maxResponseSize)){ + return true; + } } while (Bytes.equals(currentRow, nextRow = peekRow())); final boolean stopRow = isStopRow(nextRow); @@ -3946,7 +4025,6 @@ public class HRegion implements HeapSize return result; } - // // New HBASE-880 Helpers // @@ -3962,7 +4040,7 @@ public class HRegion implements HeapSize public static final long FIXED_OVERHEAD = ClassSize.align( (2 * Bytes.SIZEOF_BOOLEAN) + - (6 * Bytes.SIZEOF_LONG) + 2 * ClassSize.ARRAY + + (7 * Bytes.SIZEOF_LONG) + 2 * ClassSize.ARRAY + (28 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1400430&r1=1400429&r2=1400430&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sat Oct 20 12:52:06 2012 @@ -228,8 +228,6 @@ public class HRegionServer implements HR protected final int numRegionsToReport; - private final long maxScannerResultSize; - // Remote HMaster private HMasterRegionInterface hbaseMaster; @@ -390,10 +388,6 @@ public class HRegionServer implements HR sleeper = new Sleeper(this.msgInterval, this); - this.maxScannerResultSize = conf.getLong( - HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, - HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); - // Task thread to process requests from Master this.worker = new Worker(); @@ -1299,10 +1293,10 @@ public class HRegionServer implements HR protected void metrics() { int numReads = this.numReads.get(); this.numReads.addAndGet(-numReads); - + int numWrites = this.numWrites.get(); this.numWrites.addAndGet(-numWrites); - + this.metrics.regions.set(this.onlineRegions.size()); this.metrics.incrementRequests(numReads + numWrites); this.metrics.numReads.inc(numReads); @@ -2534,7 +2528,9 @@ public class HRegionServer implements HR public Result [] next(final long scannerId, int nbRows) throws IOException { try { String scannerName = String.valueOf(scannerId); - InternalScanner s = this.scanners.get(scannerName); + // HRegionServer only deals with Region Scanner, + // thus, we just typecast directly + HRegion.RegionScanner s = (HRegion.RegionScanner)this.scanners.get(scannerName); if (s == null) { throw new UnknownScannerException("Name: " + scannerName); } @@ -2548,34 +2544,13 @@ public class HRegionServer implements HR } this.leases.renewLease(scannerName); List results = new ArrayList(nbRows); - long currentScanResultSize = 0; - List values = new ArrayList(); - int i = 0; - for (; i < nbRows && currentScanResultSize < maxScannerResultSize; i++) { - // Collect values to be returned here - boolean moreRows = s.next(values, HRegion.METRIC_NEXTSIZE); - if (!values.isEmpty()) { - if (maxScannerResultSize < Long.MAX_VALUE){ - for (KeyValue kv : values) { - currentScanResultSize += kv.heapSize(); - } - } - results.add(new Result(values)); - } - if (!moreRows) { - break; - } - values.clear(); - } - numReads.addAndGet(i); - // Below is an ugly hack where we cast the InternalScanner to be a - // HRegion.RegionScanner. The alternative is to change InternalScanner - // interface but its used everywhere whereas we just need a bit of info - // from HRegion.RegionScanner, IF its filter if any is done with the scan + s.nextRows(results, nbRows, HRegion.METRIC_NEXTSIZE); + numReads.addAndGet(results.size()); + // IF its filter if any is done with the scan // and wants to tell the client to stop the scan. This is done by passing // a null result. - return ((HRegion.RegionScanner)s).isFilterDone() && results.isEmpty()? - null: results.toArray(new Result[0]); + return s.isFilterDone() && results.isEmpty() ? null : + results.toArray(new Result[0]); } catch (Throwable t) { if (t instanceof NotServingRegionException) { String scannerName = String.valueOf(scannerId); Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1400430&r1=1400429&r2=1400430&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Sat Oct 20 12:52:06 2012 @@ -345,6 +345,11 @@ class StoreScanner extends NonLazyKeyVal store != null ? store.getComparator() : null; long addedResultsSize = 0; + // set the responseSize so that it now can fetch records + // in terms of keyvalue's boundary rather than row's boundary + boolean partialRow = scan.getPartialRow(); + long maxResponseSize = scan.getMaxResponseSize(); + long partialResponseSize = scan.getCurrentPartialResponseSize(); try { LOOP: while((kv = this.heap.peek()) != null) { // kv is no longer immutable due to KeyOnlyFilter! use copy for safety @@ -395,6 +400,12 @@ class StoreScanner extends NonLazyKeyVal + " Cannot allow operations that fetch more than " + HRegionServer.getResponseSizeLimit() + " bytes."); throw new DoNotRetryIOException("Result too large"); + } else if (maxResponseSize != -1) { + // If the user is strict about the response size and allows + // partialRow scanning, we only return the number of keyvalue + // pairs to fill up the request size. Otherwise when partialRow + // is false, we just fetch the entire row + partialResponseSize += copyKv.getLength(); } outResult.add(copyKv); numNewKeyValues++; @@ -411,7 +422,9 @@ class StoreScanner extends NonLazyKeyVal } else { this.heap.next(); } - + if (maxResponseSize != -1 && partialResponseSize >= maxResponseSize) { + break LOOP; + } if (limit > 0 && (numNewKeyValues == limit)) { break LOOP; } @@ -483,8 +496,13 @@ class StoreScanner extends NonLazyKeyVal throw e; - } finally { - // update the counter + } finally { + // update the remaining response size + if (maxResponseSize != -1) { + partialResponseSize = Math.min(maxResponseSize, partialResponseSize); + scan.setCurrentPartialResponseSize(partialResponseSize); + } + // update the counter if (addedResultsSize > 0 && metric != null) { HRegion.incrNumericMetric(this.metricNamePrefix + metric, addedResultsSize); Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestMaxResponseSize.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestMaxResponseSize.java?rev=1400430&view=auto ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestMaxResponseSize.java (added) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestMaxResponseSize.java Sat Oct 20 12:52:06 2012 @@ -0,0 +1,213 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.UUID; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueTestUtil; +import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; +import org.apache.hadoop.hbase.filter.ColumnRangeFilter; +import org.apache.hadoop.hbase.filter.PrefixFilter; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + + +public class TestMaxResponseSize{ + final Log LOG = LogFactory.getLog(getClass()); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static byte [] ROW = Bytes.toBytes("testRow"); + private static byte [] FAMILY = Bytes.toBytes("testFamily"); + private static byte [] QUALIFIER = Bytes.toBytes("testQualifier"); + private static byte [] VALUE = Bytes.toBytes("testValue"); + private static int SLAVES = 3; + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(SLAVES); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + // Nothing to do. + } + + /** + * @throws java.lang.Exception + */ + @After + public void tearDown() throws Exception { + // Nothing to do. + } + + public void createTestTable() throws Exception { + } + /** + * Test from client side for scan with responseSize and partialRow + * responseSize is small value + * @throws Exception + */ + + @Test + public void testScanMaxRequstSize() throws Exception { + byte [] TABLE = Bytes.toBytes("testScanMaxRequstSize"); + byte [][] ROWS= makeNAscii(ROW, 3); + byte [][] FAMILIES = makeNAscii(FAMILY, 3); + byte [][] QUALIFIERS = makeNAscii(QUALIFIER, 10); + + HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES); + List kvListExp = new ArrayList(); + + Put put; + long kvSize = (new KeyValue(ROWS[0], FAMILIES[0], QUALIFIERS[0], 1, VALUE)) + .getLength(); + long rowSize = (kvSize * (FAMILIES.length * QUALIFIERS.length)); + + for (int r=0; r < ROWS.length; r++) { + put = new Put(ROWS[r]); + for (int c=0; c < FAMILIES.length; c++) { + for (int q=0; q < QUALIFIERS.length; q++) { + KeyValue kv = new KeyValue(ROWS[r], FAMILIES[c], QUALIFIERS[q], 1, VALUE); + put.add(kv); + kvListExp.add(kv); + } + } + ht.put(put); + } + + /** + * Test with the small bufferSize that is smaller rowSize + * The response size is set to only fit half a row. + * if partialRow == true, the expected number of fetches is 6 in order to + * retrieve all the 3 rows, otherwise we can fetch an entire row each time, + * which makes the expected number to be 3. + */ + long responseSize = rowSize / 2; + long scanCntExp = ((rowSize + responseSize - 1) / responseSize) * ROWS.length; + testScan(ht, rowSize, kvSize, scanCntExp, kvListExp, responseSize, true); + + scanCntExp = ROWS.length; + testScan(ht, rowSize, kvSize, scanCntExp, kvListExp, responseSize, false); + + /** + * Test with a big responseSize across mutliple rows + * The response size is set to only fit one and a half rows. + * if partialRow == true, the expected number of fetches is 4 + * in order to retrieve all the 3 rows. + * if partialRow == false, the exptect number of fetch is 3 since each time + * it still fetches entire row at a time. + */ + responseSize = rowSize + rowSize / 2 ; + long nbRows = (responseSize + rowSize - 1)/ rowSize; + scanCntExp = (ROWS.length + nbRows - 1) / nbRows * nbRows; + testScan(ht, rowSize, kvSize, scanCntExp, kvListExp, responseSize, true); + + scanCntExp = ROWS.length; + testScan(ht, rowSize, kvSize, scanCntExp, kvListExp, responseSize, false); + + } + + void testScan(HTable ht, long rowSize, long kvSize, + long scanCntExp, List kvListExp, + long responseSize, boolean partialRow) throws Exception{ + long availResponseSize = responseSize; + long kvNumPerRow = rowSize/kvSize; + int scanCntAct = 0; + boolean toLog = true; + Scan scan = new Scan(); + Result result; + ResultScanner scanner; + List kvListScan = new ArrayList(); + scan.setResponseSetting(responseSize, partialRow); + scanner = ht.getScanner(scan); + kvListScan.clear(); + while ((result = scanner.next()) != null) { + scanCntAct++; + for (KeyValue kv : result.list()) { + kvListScan.add(kv); + } + } + + System.out.println("total number of scans: " + scanCntAct + ", " + + scanCntExp+ ","+responseSize+","+partialRow); + assertEquals(scanCntExp, scanCntAct); + result = new Result(kvListScan); + verifyResult(result, kvListExp, toLog, + "Testing scan with responseSize = " + responseSize + + ", partialRow = " + partialRow); + } + + private void verifyResult(Result result, List kvList, boolean toLog, + String msg) { + LOG.info(msg); + LOG.info("Exp cnt: " + kvList.size()); + LOG.info("True cnt is: " + result.size()); + assertEquals(kvList.size(), result.size()); + + if (kvList.size() == 0) return; + int i = 0; + for (KeyValue kv : result.sorted()) { + KeyValue kvExp = kvList.get(i++); + if (toLog) { + LOG.info("get kv is: " + kv.toString()); + LOG.info("exp kv is: " + kvExp.toString()); + } + assertTrue("Not equal", kvExp.equals(kv)); + } + + } + + private byte [][] makeNAscii(byte [] base, int n) { + byte [][] ret = new byte[n][]; + for(int i=0;i