Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B49F8D996 for ; Sat, 10 Nov 2012 00:02:14 +0000 (UTC) Received: (qmail 94226 invoked by uid 500); 10 Nov 2012 00:02:14 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 94175 invoked by uid 500); 10 Nov 2012 00:02:14 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 94168 invoked by uid 99); 10 Nov 2012 00:02:14 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 10 Nov 2012 00:02:14 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 10 Nov 2012 00:02:08 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id A1D7C23888EA for ; Sat, 10 Nov 2012 00:01:45 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1407699 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/client/ test/java/org/apache/hadoop/hb... Date: Sat, 10 Nov 2012 00:01:44 -0000 To: commits@hbase.apache.org From: liyin@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121110000145.A1D7C23888EA@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: liyin Date: Sat Nov 10 00:01:42 2012 New Revision: 1407699 URL: http://svn.apache.org/viewvc?rev=1407699&view=rev Log: JIRA [HBASE-6770]: Allow scanner to specify the response size Author: cjin Summary: Allow scanner to specify the request size rather than set both batch and caching to control the response size, and it can also specify whether the row can be truncated or not Test Plan: unitTest, add TestMaxResponseSize.java to test the new fields (maxRequestSize, partialRow) and add TestHRegionNextRows() in TestHRegion.java Reviewers: aaiyer, kranganathan, kannan, liyintang Reviewed By: kannan CC: hbase-eng@, alex Differential Revision: https://phabricator.fb.com/D607836 Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestMaxResponseSize.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1407699&r1=1407698&r2=1407699&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Sat Nov 10 00:01:42 2012 @@ -472,7 +472,7 @@ public final class HConstants { * scanner's next method. */ public static String HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY = "hbase.client.scanner.max.result.size"; - + /** * Maximum number of bytes returned when calling a scanner's next method. * Note that when a single row is larger than this limit the row is still @@ -484,6 +484,15 @@ public final class HConstants { /** + * Maximum number of bytes returned when calling a scanner's next method. + * Used with partialRow parameter on the client side. Note that when a + * single row is larger than this limit, the row is still returned completely + * if partialRow is true, otherwise, the row will be truncated in order to + * fit the memory. + */ + public static int DEFAULT_HBASE_SCANNER_MAX_RESULT_SIZE = Integer.MAX_VALUE; + + /** * HRegion server lease period in milliseconds. Clients must report in within this period * else they are considered dead. Unit measured in ms (milliseconds). */ Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1407699&r1=1407698&r2=1407699&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java Sat Nov 10 00:01:42 2012 @@ -1065,14 +1065,13 @@ public class HTable implements HTableInt } if (cache.size() == 0) { Result [] values = null; - long remainingResultSize = maxScannerResultSize; - int countdown = this.caching; // We need to reset it if it's a new callable that was created // with a countdown in nextScanner callable.setCaching(this.caching); // This flag is set when we want to skip the result returned. We do // this when we reset scanner because it split under us. boolean skipFirst = false; + boolean foundResults = false; do { try { // Server returns a null values if scanning is to stop. Else, @@ -1094,7 +1093,7 @@ public class HTable implements HTableInt long elapsed = System.currentTimeMillis() - lastNext; ScannerTimeoutException ex = new ScannerTimeoutException( elapsed + "ms passed since the last invocation, " + - "timeout is currently set to " + scannerTimeout); + "timeout is currently set to " + scannerTimeout); ex.initCause(e); throw ex; } @@ -1114,23 +1113,17 @@ public class HTable implements HTableInt } // Clear region this.currentRegion = null; - continue; } lastNext = System.currentTimeMillis(); if (values != null && values.length > 0) { + foundResults = true; for (Result rs : values) { cache.add(rs); - for (KeyValue kv : rs.raw()) { - remainingResultSize -= kv.heapSize(); - } - countdown--; this.lastResult = rs; } } - // Values == null means server-side filter has determined we must STOP - } while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null)); + } while (!foundResults && nextScanner(this.caching, values == null)); } - if (cache.size() > 0) { return cache.poll(); } Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java?rev=1407699&r1=1407698&r2=1407699&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java Sat Nov 10 00:01:42 2012 @@ -83,8 +83,11 @@ import java.util.TreeSet; * execute {@link #setCacheBlocks(boolean)}. */ public class Scan extends Operation implements Writable { - private static final byte SCAN_VERSION = (byte)3; - + private static final int VERSION_STORE_LIMIT = 2; + private static final int VERSION_STORE_OFFSET = 3; + private static final int VERSION_RESPONSE_SIZE = 4; + private static final byte SCAN_VERSION = VERSION_RESPONSE_SIZE; + private byte [] startRow = HConstants.EMPTY_START_ROW; private byte [] stopRow = HConstants.EMPTY_END_ROW; private int maxVersions = 1; @@ -92,6 +95,9 @@ public class Scan extends Operation impl private int storeLimit = -1; private int storeOffset = 0; private int caching = -1; + private int maxResponseSize = HConstants.DEFAULT_HBASE_SCANNER_MAX_RESULT_SIZE; + private int currentPartialResponseSize = 0; + private boolean partialRow = false; private boolean cacheBlocks = true; private Filter filter = null; private TimeRange tr = new TimeRange(); @@ -331,6 +337,55 @@ public class Scan extends Operation impl */ public void setCaching(int caching) { this.caching = caching; + this.partialRow = false; + this.maxResponseSize = HConstants.DEFAULT_HBASE_SCANNER_MAX_RESULT_SIZE; + } + + /** + * This is technically not the max available memory setting, more of a hint. + * We will add KV's till we exceed this setting if partialRow is true, + * and add entire rows till we exceed this setting if partialRow is false. + * !!!NOTE!!!: this call will overwrite the caching setting and set it as + * int.max_value. If you really want row-based constraint as well, use + * setCaching(int caching), which will reset maxResponseSize to match your + * configuration and disable partial row. + */ + public void setCaching(int responseSize, boolean partialRow) { + this.maxResponseSize = responseSize; + this.partialRow = partialRow; + this.caching = Integer.MAX_VALUE; + } + + /** + * @return maximum response size that client can handle for a single call to next() + */ + public int getMaxResponseSize() { + return this.maxResponseSize; + } + + /** + * @return whether the last row can be partially transferred for a single call to next() + */ + public boolean isPartialRow() { + return this.partialRow; + } + + /** + * Set currentPartialResponseSize to accumulated response size + * for all the KeyValue pairs collected so far. This is only used at + * server side, and not used as a client API. + * @param responseSize + */ + public void setCurrentPartialResponseSize(int responseSize) { + this.currentPartialResponseSize = responseSize; + } + + /* + * Get current PartialResponseSize. This is only used at server side, + * and not used as a client API. + */ + public int getCurrentPartialResponseSize() { + return this.currentPartialResponseSize; } /** @@ -528,6 +583,8 @@ public class Scan extends Operation impl map.put("caching", this.caching); map.put("cacheBlocks", this.cacheBlocks); map.put("storeLimit", this.storeLimit); + map.put("maxResponseSize", this.maxResponseSize); + map.put("partialRow", this.partialRow); List timeRange = new ArrayList(); timeRange.add(this.tr.getMin()); timeRange.add(this.tr.getMax()); @@ -584,12 +641,16 @@ public class Scan extends Operation impl this.stopRow = Bytes.readByteArray(in); this.maxVersions = in.readInt(); this.batch = in.readInt(); - if (version > 1) { + if (version >= VERSION_STORE_LIMIT) { this.storeLimit = in.readInt(); } - if (version > 2) { + if (version >= VERSION_STORE_OFFSET) { this.storeOffset = in.readInt(); } + if (version >= VERSION_RESPONSE_SIZE) { + this.maxResponseSize = in.readInt(); + this.partialRow = in.readBoolean(); + } this.caching = in.readInt(); this.cacheBlocks = in.readBoolean(); if(in.readBoolean()) { @@ -616,22 +677,29 @@ public class Scan extends Operation impl public void write(final DataOutput out) throws IOException { byte version = (byte)1; - if (this.storeOffset != 0) { - version = SCAN_VERSION; + if (this.maxResponseSize != HConstants.DEFAULT_HBASE_SCANNER_MAX_RESULT_SIZE) { + version = (byte) VERSION_RESPONSE_SIZE; + } else if (this.storeOffset != 0) { + version = (byte)VERSION_STORE_OFFSET; } else if (this.storeLimit != -1) { - version = 2; + version = (byte)VERSION_STORE_LIMIT; } + out.writeByte(version); Bytes.writeByteArray(out, this.startRow); Bytes.writeByteArray(out, this.stopRow); out.writeInt(this.maxVersions); out.writeInt(this.batch); - if (version > 1) { + if (version >= VERSION_STORE_LIMIT) { out.writeInt(this.storeLimit); } - if (version > 2) { + if (version >= VERSION_STORE_OFFSET) { out.writeInt(this.storeOffset); } + if (version >= VERSION_RESPONSE_SIZE) { + out.writeInt(this.maxResponseSize); + out.writeBoolean(this.partialRow); + } out.writeInt(this.caching); out.writeBoolean(this.cacheBlocks); if(this.filter == null) { Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1407699&r1=1407698&r2=1407699&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sat Nov 10 00:01:42 2012 @@ -3084,10 +3084,7 @@ public class HRegion implements HeapSize return next(outResults, limit, null); } - @Override - public synchronized boolean next(List outResults, int limit, - String metric) throws IOException { - readRequests.incrTotalRequstCount(); + private void preCondition() throws IOException{ if (this.filterClosed) { throw new UnknownScannerException("Scanner was closed (timed out?) " + "after we renewed it. Could be caused by a very slow scanner " + @@ -3101,7 +3098,68 @@ public class HRegion implements HeapSize // This could be a new thread from the last time we called next(). MultiVersionConsistencyControl.setThreadReadPoint(this.readPt); + } + /** + * A method to return all the rows that can fit in the response size. + * it respects the two stop conditions: + * 1) scan.getMaxResponseSize + * 2) scan.getCaching() (which is nbRows) + * the loop breaks whoever comes first. + * This is only used by scan(), not get() + * @param outResults a list of rows to return + * @param nbRows the number of rows that can be returned at most + * @param metric the metric name + * @return true if there are more rows to fetch. + * + * This is used by Scans. + */ + public synchronized void nextRows(List outResults, int nbRows, + String metric) throws IOException { + preCondition(); + List tmpList = new ArrayList(); + int limit = this.getOriginalScan().getBatch(); + int currentNbRows = 0; + boolean moreRows = true; + // This is necessary b/c partialResponseSize is not serialized through RPC + getOriginalScan().setCurrentPartialResponseSize(0); + int maxResponseSize = getOriginalScan().getMaxResponseSize(); + do { + moreRows = nextInternal(tmpList, limit, metric); + if (!tmpList.isEmpty()) { + currentNbRows++; + if (outResults != null) { + outResults.add(new Result(tmpList)); + tmpList.clear(); + } + } + resetFilters(); + if (isFilterDone()) { + readRequests.incrTotalRequstCount(currentNbRows); + return; + } + + // While Condition + // 1. respect maxResponseSize and nbRows whichever comes first, + // 2. recheck the currentPartialResponseSize is to catch the case + // where maxResponseSize is saturated and partialRow == false + // since we allow this case valid in the nextInternal() layer + } while (moreRows && + (getOriginalScan().getCurrentPartialResponseSize() < maxResponseSize + && currentNbRows < nbRows)); + + readRequests.incrTotalRequstCount(currentNbRows); + } + + /** + * This is used by Gets & Compactions & unit tests, whereas nextRows() is + * used by Scans + */ + @Override + public synchronized boolean next(List outResults, int limit, + String metric) throws IOException { + readRequests.incrTotalRequstCount(); + preCondition(); boolean returnResult; if (outResults.isEmpty()) { // Usually outResults is empty. This is true when next is called @@ -3151,6 +3209,9 @@ public class HRegion implements HeapSize throw new IllegalArgumentException("First parameter should be an empty list"); } + boolean partialRow = getOriginalScan().isPartialRow(); + long maxResponseSize = getOriginalScan().getMaxResponseSize(); + while (true) { byte [] currentRow = peekRow(); if (isStopRow(currentRow)) { @@ -3170,10 +3231,17 @@ public class HRegion implements HeapSize do { this.storeHeap.next(results, limit - results.size(), metric); if (limit > 0 && results.size() == limit) { - if (this.filter != null && filter.hasFilterRow()) throw new IncompatibleFilterException( + if (this.filter != null && filter.hasFilterRow()) + throw new IncompatibleFilterException( "Filter with filterRow(List) incompatible with scan with limit!"); return true; // we are expecting more yes, but also limited to how many we can return. } + // this gaurantees that we still complete the entire row if + // currentPartialResponseSize exceeds the maxResponseSize. + if (partialRow && getOriginalScan().getCurrentPartialResponseSize() + >= maxResponseSize) { + return true; + } } while (Bytes.equals(currentRow, nextRow = peekRow())); final boolean stopRow = isStopRow(nextRow); Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1407699&r1=1407698&r2=1407699&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sat Nov 10 00:01:42 2012 @@ -228,8 +228,6 @@ public class HRegionServer implements HR protected final int numRegionsToReport; - private final long maxScannerResultSize; - // Remote HMaster private HMasterRegionInterface hbaseMaster; @@ -391,10 +389,6 @@ public class HRegionServer implements HR sleeper = new Sleeper(this.msgInterval, this); - this.maxScannerResultSize = conf.getLong( - HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, - HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); - // Task thread to process requests from Master this.worker = new Worker(); @@ -2536,7 +2530,9 @@ public class HRegionServer implements HR public Result [] next(final long scannerId, int nbRows) throws IOException { try { String scannerName = String.valueOf(scannerId); - InternalScanner s = this.scanners.get(scannerName); + // HRegionServer only deals with Region Scanner, + // thus, we just typecast directly + HRegion.RegionScanner s = (HRegion.RegionScanner)this.scanners.get(scannerName); if (s == null) { throw new UnknownScannerException("Name: " + scannerName); } @@ -2549,34 +2545,13 @@ public class HRegionServer implements HR throw e; } this.leases.renewLease(scannerName); - List results = new ArrayList(nbRows); - long currentScanResultSize = 0; - List values = new ArrayList(); - int i = 0; - for (; i < nbRows && currentScanResultSize < maxScannerResultSize; i++) { - // Collect values to be returned here - boolean moreRows = s.next(values, HRegion.METRIC_NEXTSIZE); - if (!values.isEmpty()) { - if (maxScannerResultSize < Long.MAX_VALUE){ - for (KeyValue kv : values) { - currentScanResultSize += kv.heapSize(); - } - } - results.add(new Result(values)); - } - if (!moreRows) { - break; - } - values.clear(); - } - numReads.addAndGet(i); - // Below is an ugly hack where we cast the InternalScanner to be a - // HRegion.RegionScanner. The alternative is to change InternalScanner - // interface but its used everywhere whereas we just need a bit of info - // from HRegion.RegionScanner, IF its filter if any is done with the scan + List results = new ArrayList(); + s.nextRows(results, nbRows, HRegion.METRIC_NEXTSIZE); + numReads.addAndGet(results.size()); + // IF its filter if any is done with the scan // and wants to tell the client to stop the scan. This is done by passing // a null result. - return ((HRegion.RegionScanner)s).isFilterDone() && results.isEmpty()? + return s.isFilterDone() && results.isEmpty()? null: results.toArray(new Result[0]); } catch (Throwable t) { if (t instanceof NotServingRegionException) { Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1407699&r1=1407698&r2=1407699&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Sat Nov 10 00:01:42 2012 @@ -37,9 +37,7 @@ import org.apache.hadoop.hbase.regionser import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.regionserver.kvaggregator.KeyValueAggregator; - - -import static org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode; +import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode; /** * Scanner scans both the memstore and the HStore. Coalesce KeyValue stream @@ -369,7 +367,11 @@ public class StoreScanner extends NonLaz KeyValue.KVComparator comparator = store != null ? store.getComparator() : null; - long addedResultsSize = 0; + int addedResultsSize = 0; + // set the responseSize so that it now can fetch records + // in terms of keyvalue's boundary rather than row's boundary + int remainingResponseSize = scan.getMaxResponseSize() + - scan.getCurrentPartialResponseSize(); try { LOOP: while((kv = this.heap.peek()) != null) { // kv is no longer immutable due to KeyOnlyFilter! use copy for safety @@ -436,7 +438,13 @@ public class StoreScanner extends NonLaz } else { this.heap.next(); } - + // If the user is strict about the response size and allows + // partialRow scanning, we only return the number of keyvalue + // pairs to fill up the request size. Otherwise when partialRow + // is false, we just fetch the entire row + if (scan.isPartialRow() && addedResultsSize >= remainingResponseSize) { + break LOOP; + } if (limit > 0 && (numNewKeyValues == limit)) { break LOOP; } @@ -509,6 +517,9 @@ public class StoreScanner extends NonLaz throw e; } finally { + // update the remaining response size + scan.setCurrentPartialResponseSize(scan.getCurrentPartialResponseSize() + + addedResultsSize); // update the counter if (addedResultsSize > 0 && metric != null) { HRegion.incrNumericMetric(this.metricNamePrefix + metric, Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestMaxResponseSize.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestMaxResponseSize.java?rev=1407699&r1=1407698&r2=1407699&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestMaxResponseSize.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestMaxResponseSize.java Sat Nov 10 00:01:42 2012 @@ -0,0 +1,310 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + + +public class TestMaxResponseSize{ + final Log LOG = LogFactory.getLog(getClass()); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static byte [] ROW = Bytes.toBytes("testRow"); + private static byte [] FAMILY = Bytes.toBytes("testFamily"); + private static byte [] QUALIFIER = Bytes.toBytes("testQualifier"); + private static byte [] VALUE = Bytes.toBytes("testValue"); + private static int NUM_RS = 3; + private static int NUM_REGION = 10; + private static int NUM_VERSION = 3; + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(NUM_RS); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + // Nothing to do. + } + + /** + * @throws java.lang.Exception + */ + @After + public void tearDown() throws Exception { + // Nothing to do. + } + + public void createTestTable() throws Exception { + } + + /** + * Test from client side for scans that are cross multiple regions + * @throws Exception + */ + @Test + public void testScanCrossRegion() throws Exception { + byte [] TABLE = Bytes.toBytes("testScanCrossRegion"); + byte[][] FAMILIES = { Bytes.toBytes("MyCF1") }; + List kvListExp = new ArrayList(); + + HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, NUM_VERSION, + Bytes.toBytes("row0"), Bytes.toBytes("row99"), NUM_REGION); + + TEST_UTIL.waitUntilAllRegionsAssigned(NUM_REGION); + + Random rand = new Random(System.currentTimeMillis()); + for (int iRow = 0; iRow < 100; ++iRow) { + final byte[] row = Bytes.toBytes(String.format("row%02d", iRow)); + Put put = new Put(row); + final long ts = System.currentTimeMillis(); + for (int iCol = 0; iCol < 10; ++iCol) { + final byte[] cf = FAMILIES[0]; + + final byte[] qual = Bytes.toBytes("col" + iCol); + final byte[] value = Bytes.toBytes("value_for_row_" + iRow + "_cf_" + + Bytes.toStringBinary(cf) + "_col_" + iCol + "_ts_" + ts + + "_random_" + rand.nextLong()); + KeyValue kv = new KeyValue(row, cf, qual, ts, value); + put.add(kv); + kvListExp.add(kv); + } + ht.put(put); + ht.flushCommits(); + } + + boolean toLog = true; + Scan scan = new Scan(); + Result result; + // each region have 10 rows, we fetch 5 rows at a time + scan.setCaching(5); + ResultScanner scanner; + List kvListScan = new ArrayList(); + scanner = ht.getScanner(scan); + kvListScan.clear(); + // do a full scan of the table that is split among multiple regions + while ((result = scanner.next()) != null) { + for (KeyValue kv : result.list()) { + kvListScan.add(kv); + } + } + scanner.close(); + result = new Result(kvListScan); + verifyResult(result, kvListExp, toLog, "testScanCrossRegion"); + } + + /** + * Test from client side for scan with responseSize and partialRow + * responseSize is small value + * @throws Exception + */ + @Test + public void testScanMaxRequstSize() throws Exception { + byte [] TABLE = Bytes.toBytes("testScanMaxRequstSize"); + byte [][] ROWS= makeNAscii(ROW, 3); + byte [][] FAMILIES = makeNAscii(FAMILY, 3); + byte [][] QUALIFIERS = makeNAscii(QUALIFIER, 10); + + HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES); + List kvListExp = new ArrayList(); + ht.setProfiling(true); + Put put; + int kvSize = (new KeyValue(ROWS[0], FAMILIES[0], QUALIFIERS[0], 1, VALUE)) + .getLength(); + int rowSize = (kvSize * (FAMILIES.length * QUALIFIERS.length)); + + for (int r=0; r < ROWS.length; r++) { + put = new Put(ROWS[r]); + for (int c=0; c < FAMILIES.length; c++) { + for (int q=0; q < QUALIFIERS.length; q++) { + KeyValue kv = new KeyValue(ROWS[r], FAMILIES[c], QUALIFIERS[q], 1, VALUE); + put.add(kv); + kvListExp.add(kv); + } + } + ht.put(put); + } + + /** + * Test with the small bufferSize that is smaller rowSize + * The response size is set to only fit half a row. + * if partialRow == true, the expected number of fetches is 6 in order to + * retrieve all the 3 rows, otherwise we can fetch an entire row each time, + * which makes the expected number to be 3. + * + * x x x x|x x x x [2] + * < rpc1 >< rpc2 > + * x x x x|x x x x [2] + * < rpc3 >< rpc4 > + * x x x x|x x x x [2] + * < rpc5 >< rpc6 > + * + * x: kv pair + * rpc#n#: n-th rpc call + * [n] : the number of scanner.next() per row + */ + int responseSize = rowSize / 2; + // each row will take ceil(rowSize/responseSize) times to fetch + // and the number of rows is ROWS.length, + // therefore the total number of fetches is + // ceil(rowSize/responseSize) * ROWS.length + int scanCntExp = ((rowSize + responseSize - 1) / responseSize) * ROWS.length; + // each scanner.next trigger a rpc call + int rpcCntExp = scanCntExp; + testScan(ht, rowSize, kvSize, scanCntExp, rpcCntExp, kvListExp, + responseSize, true); + + /** + * x x x x|x x x x [1] + * < rpc1 > + * x x x x|x x x x [1] + * < rpc2 > + * x x x x|x x x x [1] + * < rpc3 > + */ + scanCntExp = ROWS.length; + rpcCntExp = ROWS.length; + testScan(ht, rowSize, kvSize, scanCntExp, rpcCntExp, kvListExp, + responseSize, false); + + /** + * Test with a big responseSize across multiple rows + * The response size is set to only fit one and a half rows. + * If partialRow == true, the expected number of RPC calls is 2, and the number of + * scan.next() is 4, since we need two scan.next() to finish each RPC call + * If partialRow == false, the expected number of RPC calls is 2 and the number of + * scan.next() is 3 since we need 2 scan.next() to exhaust the first RPC call. + * + * x x x x x x x x [1] + * < rpc1 + * x x x x|x x x x [2] + * >< + * x x x x x x x x [1] + * rpc2 > + */ + responseSize = rowSize + rowSize / 2 ; + // nbRows: the number of rows that responseSize can at most contain + // (including the last partial row) + int nbRows = (responseSize + rowSize - 1) / rowSize; + rpcCntExp = ROWS.length * rowSize / responseSize; + scanCntExp = rpcCntExp * nbRows; + testScan(ht, rowSize, kvSize, scanCntExp, rpcCntExp, kvListExp, + responseSize, true); + + /** + * x x x x x x x x [1] + * < rpc1 + * x x x x x x x x [1] + * > + * x x x x x x x x [1] + * < rpc2 > + */ + scanCntExp = ROWS.length; + rpcCntExp = (ROWS.length + nbRows - 1) / nbRows; + testScan(ht, rowSize, kvSize, scanCntExp, rpcCntExp, kvListExp, + responseSize, false); + } + + void testScan(HTable ht, int rowSize, int kvSize, + int scanCntExp, int rpcCntExp, List kvListExp, + int responseSize, boolean partialRow) throws Exception{ + int scanCntAct = 0; + boolean toLog = true; + Scan scan = new Scan(); + Result result; + ResultScanner scanner; + List kvListScan = new ArrayList(); + scan.setCaching(responseSize, partialRow); + scanner = ht.getScanner(scan); + kvListScan.clear(); + int rpcCntAct = 0; + while ((result = scanner.next()) != null) { + scanCntAct++; + for (KeyValue kv : result.list()) { + kvListScan.add(kv); + } + if (ht.getProfilingData() != null) + rpcCntAct++; + } + scanner.close(); + assertEquals(scanCntExp, scanCntAct); + assertEquals(rpcCntExp, rpcCntAct); + result = new Result(kvListScan); + verifyResult(result, kvListExp, toLog, + "Testing scan with responseSize = " + responseSize + + ", partialRow = " + partialRow); + } + + private void verifyResult(Result result, List kvList, boolean toLog, + String msg) { + LOG.info(msg); + LOG.info("Exp cnt: " + kvList.size()); + LOG.info("True cnt is: " + result.size()); + assertEquals(kvList.size(), result.size()); + + if (kvList.size() == 0) return; + int i = 0; + for (KeyValue kv : result.sorted()) { + KeyValue kvExp = kvList.get(i++); + if (toLog) { + LOG.info("get kv is: " + kv.toString()); + LOG.info("exp kv is: " + kvExp.toString()); + } + assertTrue("Not equal", kvExp.equals(kv)); + } + } + + private byte [][] makeNAscii(byte [] base, int n) { + byte [][] ret = new byte[n][]; + for(int i=0;i expected = new ArrayList(); + fillTable(rows, families, 2, expected); + /** + * in this case we know kv size = 28 + * KLEN VLEN ROWLEN ROWNAME CFLEN CFNAME TS TYPE + * --4-|--4-|--2---|---4---|--1--|--4---|-8-|--1-- ===> 28 bytes + */ + Scan scan = new Scan(); + scan.setMaxVersions(3); + scan.addFamily(families[1]); + scan.addFamily(families[3]); + + // fetch one kv even when responseSize = 0, oh well, this's the semantic + // that users should be aware of + compareNextRows(scan, 0, true, Integer.MAX_VALUE, expected.subList(0, 1)); + // fetch the last kv pair if the responseSize is not big enough + compareNextRows(scan, 1, true, Integer.MAX_VALUE, expected.subList(0, 1)); + // maxResponseSize perfectly fits one kv + compareNextRows(scan, 28, true, Integer.MAX_VALUE, expected.subList(0, 1)); + + // if partialRow == true, fetch as much as maxResponseSize allows + compareNextRows(scan, 29, true, Integer.MAX_VALUE, expected.subList(0, 2)); + // if partialRow == false, fetch the entire row + compareNextRows(scan, 29, false, Integer.MAX_VALUE, expected.subList(0, 6)); + + // fetch everything in the table as long as responseSize is big enough + compareNextRows(scan, 10000, true, Integer.MAX_VALUE, expected); + compareNextRows(scan, 10000, false, Integer.MAX_VALUE, expected); + + // check nbRows + // fetch two rows, each has two columns and each column has 3 kvs + compareNextRows(scan, 10000, true, 2, expected.subList(0, 12)); + compareNextRows(scan, 10000, false, 2, expected.subList(0, 12)); + } + + private void fillTable(byte[][] rows, byte[][] families, int nTs, + List expected) throws IOException { + Put put = null; + long ts = System.currentTimeMillis(); + long[] timestamps = { ts, ts - 10, ts - 20 }; + for (byte[] row : rows) { + put = new Put(row); + for (byte[] cf : families) { + for (long t : timestamps) { + put.add(cf, null, t, null); + if (cf.equals(families[1]) || cf.equals(families[3])) { + expected.add(new KeyValue(row, cf, null, t, KeyValue.Type.Put, + null)); + } + } + } + region.put(put); + } + } + + private void compareNextRows(Scan scan, int responseSize, boolean partialRow, + int nbRows, List expected) + throws IOException { + if (nbRows == Integer.MAX_VALUE) { + scan.setCaching(responseSize, partialRow); + } else { + scan.setCaching(nbRows); + } + RegionScanner rs = (RegionScanner) region.getScanner(scan); + List kvListScan = new ArrayList(); + List results = new ArrayList(); + rs.nextRows(results, nbRows, null); + for (Result res : results) { + for (KeyValue kv : res.list()) { + kvListScan.add(kv); + } + } + assertEquals(expected.size(), kvListScan.size()); + for (int i = 0; i < kvListScan.size(); i++) { + assertEquals(expected.get(i), kvListScan.get(i)); + } + rs.close(); + } + public void testRegionScanner_Next() throws IOException { byte [] tableName = Bytes.toBytes("testtable"); byte [] row1 = Bytes.toBytes("row1"); Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java?rev=1407699&r1=1407698&r2=1407699&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogFiltering.java Sat Nov 10 00:01:42 2012 @@ -72,6 +72,7 @@ public class TestHLogFiltering { private void fillTable() throws IOException, InterruptedException { HTable table = TEST_UTIL.createTable(TABLE_NAME, FAMILIES, 3, Bytes.toBytes("row0"), Bytes.toBytes("row99"), NUM_RS); + TEST_UTIL.waitUntilAllRegionsAssigned(NUM_RS); Random rand = new Random(19387129L); for (int iStoreFile = 0; iStoreFile < 4; ++iStoreFile) { for (int iRow = 0; iRow < 100; ++iRow) { @@ -98,7 +99,6 @@ public class TestHLogFiltering { table.flushCommits(); } } - TEST_UTIL.waitUntilAllRegionsAssigned(NUM_RS); } @Test