Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1E47FF17C for ; Sat, 23 Mar 2013 18:19:05 +0000 (UTC) Received: (qmail 56494 invoked by uid 500); 23 Mar 2013 18:19:04 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 56384 invoked by uid 500); 23 Mar 2013 18:19:03 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 56376 invoked by uid 99); 23 Mar 2013 18:19:03 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 23 Mar 2013 18:19:03 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 23 Mar 2013 18:19:00 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 0B3DE23888E4; Sat, 23 Mar 2013 18:18:40 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1460199 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regionserver/ Date: Sat, 23 Mar 2013 18:18:39 -0000 To: commits@hbase.apache.org From: liyin@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130323181840.0B3DE23888E4@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: liyin Date: Sat Mar 23 18:18:39 2013 New Revision: 1460199 URL: http://svn.apache.org/r1460199 Log: [HBASE-8185] Pulling HRegionScanner code outside the HRegion class. Author: manukranthk Summary: This diff is to decompose the diff D708638 into few smaller diffs for a quicker and easier review process. Test Plan: Since this is a refactoring change, testing the code change by running MR Unit tests. Reviewers: liyintang, rshroff, aaiyer Reviewed By: liyintang CC: hbase-eng@, erling, arice Differential Revision: https://phabricator.fb.com/D743249 Task ID: 2103689 Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionContext.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreThreadUtils.java Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1460199&r1=1460198&r2=1460199&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sat Mar 23 18:18:39 2013 @@ -91,8 +91,6 @@ import org.apache.hadoop.hbase.ipc.HRegi import org.apache.hadoop.hbase.metrics.RequestMetrics; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; -import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner.ScanPrefetcher; -import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner.ScanResult; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; @@ -587,8 +585,9 @@ public class HRegion implements HeapSize if (!families.isEmpty()) { // initialize the thread pool for opening stores in parallel. ThreadPoolExecutor storeOpenerThreadPool = - getStoreOpenAndCloseThreadPool( - "StoreOpenerThread-" + this.regionInfo.getRegionNameAsString()); + StoreThreadUtils.getStoreOpenAndCloseThreadPool( + "StoreOpenerThread-" + this.regionInfo.getRegionNameAsString(), + this.getRegionInfo(), this.conf); CompletionService completionService = new ExecutorCompletionService(storeOpenerThreadPool); @@ -857,8 +856,8 @@ public class HRegion implements HeapSize if (!stores.isEmpty()) { // initialize the thread pool for closing stores in parallel. ThreadPoolExecutor storeCloserThreadPool = - getStoreOpenAndCloseThreadPool("StoreCloserThread-" - + this.regionInfo.getRegionNameAsString()); + StoreThreadUtils.getStoreOpenAndCloseThreadPool("StoreCloserThread-" + + this.regionInfo.getRegionNameAsString(), this.getRegionInfo(), this.conf); CompletionService> completionService = new ExecutorCompletionService>( storeCloserThreadPool); @@ -902,41 +901,6 @@ public class HRegion implements HeapSize } } - protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool( - final String threadNamePrefix) { - int numStores = Math.max(1, this.regionInfo.getTableDesc().families.size()); - int maxThreads = Math.min(numStores, - conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX, - HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)); - return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix); - } - - protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool( - final String threadNamePrefix) { - int numStores = Math.max(1, this.regionInfo.getTableDesc().families.size()); - int maxThreads = Math.max(1, - conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX, - HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX) - / numStores); - return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix); - } - - private ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads, - final String threadNamePrefix) { - ThreadPoolExecutor openAndCloseThreadPool = Threads - .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, - new ThreadFactory() { - private int count = 1; - - public Thread newThread(Runnable r) { - Thread t = new Thread(r, threadNamePrefix + "-" + count++); - t.setDaemon(true); - return t; - } - }); - return openAndCloseThreadPool; - } - /** * @return True if its worth doing a flush before we put up the close flag. */ @@ -1686,7 +1650,10 @@ public class HRegion implements HeapSize } protected InternalScanner instantiateInternalScanner(Scan scan, List additionalScanners) throws IOException { - return new RegionScanner(scan, additionalScanners); + // return new RegionScanner(scan, additionalScanners); + RegionContext regionContext = new RegionContext(stores, scannerReadPoints, + comparator, mvcc, closing, closed, regionInfo, rowReadCnt); + return new RegionScanner(scan, additionalScanners, regionContext); } /* @@ -2997,408 +2964,6 @@ public class HRegion implements HeapSize return this.tableDir; } - /** - * RegionScanner is an iterator through a bunch of rows in an HRegion. - *

- * It is used to combine scanners from multiple Stores (aka column families). - */ - class RegionScanner implements InternalScanner { - // Package local for testability - KeyValueHeap storeHeap = null; - private final byte [] stopRow; - private Filter filter; - private final int batch; - private int isScan; - private boolean filterClosed = false; - private long readPt; - private Scan originalScan; - private Future prefetchScanFuture = null; - - RegionScanner(Scan scan, List additionalScanners) throws IOException { - //DebugPrint.println("HRegionScanner."); - - this.originalScan = scan; - - this.filter = scan.getFilter(); - this.batch = scan.getBatch(); - if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { - this.stopRow = null; - } else { - this.stopRow = scan.getStopRow(); - } - // If we are doing a get, we want to be [startRow,endRow] normally - // it is [startRow,endRow) and if startRow=endRow we get nothing. - this.isScan = scan.isGetScan() ? -1 : 0; - - // synchronize on scannerReadPoints so that nobody calculates - // getSmallestReadPoint, before scannerReadPoints is updated. - synchronized(scannerReadPoints) { - this.readPt = MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); - scannerReadPoints.put(this, this.readPt); - } - - List scanners = new ArrayList(); - if (additionalScanners != null) { - scanners.addAll(additionalScanners); - } - - for (Map.Entry> entry : - scan.getFamilyMap().entrySet()) { - Store store = stores.get(entry.getKey()); - StoreScanner scanner = store.getScanner(scan, entry.getValue()); - scanners.add(scanner); - } - this.storeHeap = new KeyValueHeap(scanners, comparator); - } - - RegionScanner(Scan scan) throws IOException { - this(scan, null); - } - - /** - * Reset both the filter and the old filter. - */ - protected void resetFilters() { - if (filter != null) { - filter.reset(); - } - } - - @Override - public boolean next(List outResults, int limit) - throws IOException { - return next(outResults, limit, null); - } - - private void preCondition() throws IOException{ - if (this.filterClosed) { - throw new UnknownScannerException("Scanner was closed (timed out?) " + - "after we renewed it. Could be caused by a very slow scanner " + - "or a lengthy garbage collection"); - } - if (closing.get() || closed.get()) { - close(); - throw new NotServingRegionException(regionInfo.getRegionNameAsString() + - " is closing=" + closing.get() + " or closed=" + closed.get()); - } - - // This could be a new thread from the last time we called next(). - MultiVersionConsistencyControl.setThreadReadPoint(this.readPt); - } - - /** - * This class abstracts the results of a single scanner's result. It tracks - * the list of Result objects if the pre-fetch next was successful, and - * tracks the exception if the next failed. - */ - class ScanResult { - final boolean isException; - IOException ioException = null; - Result[] outResults; - boolean moreRows; - - public ScanResult(IOException ioException) { - isException = true; - this.ioException = ioException; - } - - public ScanResult(boolean moreRows, Result[] outResults) { - isException = false; - this.moreRows = moreRows; - this.outResults = outResults; - } - } - - /** - * This Callable abstracts calling a pre-fetch next. This is called on a - * threadpool. It makes a pre-fetch next call with the same parameters as - * the incoming next call. Note that the number of rows to return (nbRows) - * and/or the memory size for the result is the same as the previous call if - * pre-fetching is enabled. If these params change dynamically, they will - * take effect in the subsequent iteration. - */ - class ScanPrefetcher implements Callable { - int nbRows; - int limit; - String metric; - - ScanPrefetcher(int nbRows, int limit, String metric) { - this.nbRows = nbRows; - this.limit = limit; - this.metric = metric; - } - - public ScanResult call() { - ScanResult scanResult = null; - List outResults = new ArrayList(); - List tmpList = new ArrayList(); - int currentNbRows = 0; - boolean moreRows = true; - try { - // This is necessary b/c partialResponseSize is not serialized through - // RPC - getOriginalScan().setCurrentPartialResponseSize(0); - int maxResponseSize = getOriginalScan().getMaxResponseSize(); - do { - moreRows = nextInternal(tmpList, limit, metric); - if (!tmpList.isEmpty()) { - currentNbRows++; - if (outResults != null) { - outResults.add(new Result(tmpList)); - tmpList.clear(); - } - } - resetFilters(); - if (isFilterDone()) { - break; - } - - // While Condition - // 1. respect maxResponseSize and nbRows whichever comes first, - // 2. recheck the currentPartialResponseSize is to catch the case - // where maxResponseSize is saturated and partialRow == false - // since we allow this case valid in the nextInternal() layer - } while (moreRows - && (getOriginalScan().getCurrentPartialResponseSize() < - maxResponseSize && currentNbRows < nbRows)); - scanResult = new ScanResult(moreRows, - outResults.toArray(new Result[0])); - } catch (IOException e) { - // we should queue the exception as the result so that we can return - // this when the result is asked for - scanResult = new ScanResult(e); - } - return scanResult; - } - } - - /** - * A method to return all the rows that can fit in the response size. - * it respects the two stop conditions: - * 1) scan.getMaxResponseSize - * 2) scan.getCaching() (which is nbRows) - * the loop breaks whoever comes first. - * This is only used by scan(), not get() - * @param outResults a list of rows to return - * @param nbRows the number of rows that can be returned at most - * @param metric the metric name - * @return true if there are more rows to fetch. - * - * This is used by Scans. - */ - public synchronized Result[] nextRows(int nbRows, String metric) - throws IOException { - preCondition(); - boolean prefetchingEnabled = getOriginalScan().getServerPrefetching(); - int limit = this.getOriginalScan().getBatch(); - ScanResult scanResult; - // if we have a prefetched result, then use it - if (prefetchingEnabled && prefetchScanFuture != null) { - try { - scanResult = prefetchScanFuture.get(); - prefetchScanFuture = null; - } catch (InterruptedException e) { - throw new IOException(e); - } catch (ExecutionException e) { - throw new IOException(e); - } - if (scanResult.isException) { - throw scanResult.ioException; - } - } - // if there are no prefetched results, then preform the scan inline - else { - ScanPrefetcher scanFetch = new ScanPrefetcher(nbRows, limit, metric); - scanResult = scanFetch.call(); - } - - if (scanResult.isException) { - throw scanResult.ioException; - } - - // schedule a background prefetch for the next result if prefetch is - // enabled on scans - boolean scanDone = - (scanResult.outResults == null || scanResult.outResults.length == 0); - if (prefetchingEnabled && !scanDone) { - ScanPrefetcher callable = new ScanPrefetcher(nbRows, limit, metric); - prefetchScanFuture = HRegionServer.scanPrefetchThreadPool.submit(callable); - } - if (!scanDone) { - rowReadCnt.addAndGet(scanResult.outResults.length); - } - return scanResult.outResults == null || - (isFilterDone() && scanResult.outResults.length == 0) ? - null : scanResult.outResults; - } - - /** - * This is used by Gets & unit tests, whereas nextRows() is - * used by Scans - */ - @Override - public synchronized boolean next(List outResults, int limit, - String metric) throws IOException { - preCondition(); - boolean returnResult; - if (outResults.isEmpty()) { - // Usually outResults is empty. This is true when next is called - // to handle scan or get operation. - returnResult = nextInternal(outResults, limit, metric); - } else { - List tmpList = new ArrayList(); - returnResult = nextInternal(tmpList, limit, metric); - outResults.addAll(tmpList); - } - rowReadCnt.incrementAndGet(); - resetFilters(); - if (isFilterDone()) { - return false; - } - return returnResult; - } - - @Override - public boolean next(List outResults) - throws IOException { - // apply the batching limit by default - return next(outResults, batch, null); - } - - @Override - public boolean next(List outResults, String metric) - throws IOException { - // apply the batching limit by default - return next(outResults, batch, metric); - } - - /* - * @return True if a filter rules the scanner is over, done. - */ - private boolean isFilterDone() { - return this.filter != null && this.filter.filterAllRemaining(); - } - - /** - * @param results empty list in which results will be stored - */ - private boolean nextInternal(List results, int limit, String metric) - throws IOException { - - if (!results.isEmpty()) { - throw new IllegalArgumentException("First parameter should be an empty list"); - } - - boolean partialRow = getOriginalScan().isPartialRow(); - long maxResponseSize = getOriginalScan().getMaxResponseSize(); - - while (true) { - byte [] currentRow = peekRow(); - if (isStopRow(currentRow)) { - if (filter != null && filter.hasFilterRow()) { - filter.filterRow(results); - } - if (filter != null && filter.filterRow()) { - results.clear(); - } - - return false; - } else if (filterRowKey(currentRow)) { - nextRow(currentRow); - results.clear(); - } else { - byte [] nextRow; - do { - this.storeHeap.next(results, limit - results.size(), metric); - if (limit > 0 && results.size() == limit) { - if (this.filter != null && filter.hasFilterRow()) - throw new IncompatibleFilterException( - "Filter with filterRow(List) incompatible with scan with limit!"); - return true; // we are expecting more yes, but also limited to how many we can return. - } - // this gaurantees that we still complete the entire row if - // currentPartialResponseSize exceeds the maxResponseSize. - if (partialRow && getOriginalScan().getCurrentPartialResponseSize() - >= maxResponseSize) { - return true; - } - } while (Bytes.equals(currentRow, nextRow = peekRow())); - - final boolean stopRow = isStopRow(nextRow); - - // now that we have an entire row, lets process with a filters: - - // first filter with the filterRow(List) - if (filter != null && filter.hasFilterRow()) { - filter.filterRow(results); - } - - if (results.isEmpty() || filterRow()) { - nextRow(currentRow); - results.clear(); - - // This row was totally filtered out, if this is NOT the last row, - // we should continue on. - - if (!stopRow) continue; - } - return !stopRow; - } - } - } - - private boolean filterRow() { - return filter != null - && filter.filterRow(); - } - private boolean filterRowKey(byte[] row) { - return filter != null - && filter.filterRowKey(row, 0, row.length); - } - - protected void nextRow(byte [] currentRow) throws IOException { - while (Bytes.equals(currentRow, peekRow())) { - this.storeHeap.next(MOCKED_LIST); - } - resetFilters(); - } - - private byte[] peekRow() { - KeyValue kv = this.storeHeap.peek(); - return kv == null ? null : kv.getRow(); - } - - private boolean isStopRow(byte [] currentRow) { - return currentRow == null || - (stopRow != null && - comparator.compareRows(stopRow, 0, stopRow.length, - currentRow, 0, currentRow.length) <= isScan); - } - - @Override - public synchronized void close() { - if (storeHeap != null) { - storeHeap.close(); - storeHeap = null; - } - // no need to sychronize here. - scannerReadPoints.remove(this); - this.filterClosed = true; - } - - KeyValueHeap getStoreHeapForTesting() { - return storeHeap; - } - - /** - * Get the original scan object that was used to create this internal one - * @return original scan object... used for debug output - */ - public Scan getOriginalScan() { - return originalScan; - } - } - // Utility methods /** * A utility method to create new instances of HRegion based on the @@ -4283,7 +3848,7 @@ public class HRegion implements HeapSize /** * A mocked list implementaion - discards all updates. */ - private static final List MOCKED_LIST = new AbstractList() { + public static final List MOCKED_LIST = new AbstractList() { @Override public void add(int index, KeyValue element) { Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1460199&r1=1460198&r2=1460199&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sat Mar 23 18:18:39 2013 @@ -2635,8 +2635,8 @@ public class HRegionServer implements HR long scannerId = (Long) params[0]; String scannerName = String.valueOf(scannerId); InternalScanner s = regionServer.scanners.get(scannerName); - if (s != null && s instanceof HRegion.RegionScanner) { - res.put("scan", ((HRegion.RegionScanner)s).getOriginalScan().toMap(DEFAULT_MAX_COLS)); + if (s != null && s instanceof RegionScanner) { + res.put("scan", ((RegionScanner)s).getOriginalScan().toMap(DEFAULT_MAX_COLS)); } if (params.length > 1) { @@ -2653,7 +2653,7 @@ public class HRegionServer implements HR String scannerName = String.valueOf(scannerId); // HRegionServer only deals with Region Scanner, // thus, we just typecast directly - HRegion.RegionScanner s = (HRegion.RegionScanner)this.scanners.get(scannerName); + RegionScanner s = (RegionScanner)this.scanners.get(scannerName); if (s == null) { throw new UnknownScannerException("Name: " + scannerName); } Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionContext.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionContext.java?rev=1460199&view=auto ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionContext.java (added) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionContext.java Sat Mar 23 18:18:39 2013 @@ -0,0 +1,115 @@ +/* + * Copyright 2010 The Apache Software Foundation + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.KVComparator; +/** + * RegionContext gives a container for all the member variables in the + * HRegion class which the RegionScanner needs in its constructor. Earlier, + * since the RegionScanner was an inner class to HRegion, these members were + * accessible. Now that the RegionScanner is an external class, these variables + * are packaged into RegionContext. + * + * @author manukranthk + * + */ +public class RegionContext { + final private Map stores; + final private ConcurrentHashMap scannerReadPoints; + final private KeyValue.KVComparator comparator; + final private MultiVersionConsistencyControl mvcc; + final private AtomicBoolean closing; + final private AtomicBoolean closed; + final private HRegionInfo regionInfo; + final private AtomicInteger rowReadCnt; + + public RegionContext(Map stores, + ConcurrentHashMap scannerReadPoints, + KeyValue.KVComparator comparator, + MultiVersionConsistencyControl mvcc, + AtomicBoolean closing, + AtomicBoolean closed, + HRegionInfo regionInfo, + AtomicInteger rowReadCnt) { + this.stores = stores; + this.scannerReadPoints = scannerReadPoints; + this.comparator = comparator; + this.mvcc = mvcc; + this.closing = closing; + this.closed = closed; + this.regionInfo = regionInfo; + this.rowReadCnt = rowReadCnt; + } + + /* + * Constructor to create a region context for + * opening a read only RegionScanner + */ + public RegionContext(Map stores, HRegionInfo regionInfo) { + this.stores = stores; + this.scannerReadPoints = new ConcurrentHashMap(); + this.comparator = regionInfo.getComparator(); + this.mvcc = new MultiVersionConsistencyControl(); + this.mvcc.setThreadReadPoint(Long.MAX_VALUE); + this.closing = new AtomicBoolean(false); + this.closed = new AtomicBoolean(false); + this.regionInfo = regionInfo; + this.rowReadCnt = new AtomicInteger(0); + } + + public Map getStores() { + return this.stores; + } + + public ConcurrentHashMap getScannerReadPoints() { + return this.scannerReadPoints; + } + + public KeyValue.KVComparator getComparator() { + return this.comparator; + } + + public MultiVersionConsistencyControl getmvcc() { + return this.mvcc; + } + + public AtomicBoolean getClosing() { + return this.closing; + } + + public AtomicBoolean getClosed() { + return this.closed; + } + + public HRegionInfo getRegionInfo() { + return this.regionInfo; + } + + public AtomicInteger getRowReadCnt() { + return this.rowReadCnt; + } +} Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java?rev=1460199&view=auto ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java (added) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java Sat Mar 23 18:18:39 2013 @@ -0,0 +1,449 @@ +/* + * Copyright 2010 The Apache Software Foundation + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.IncompatibleFilterException; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * RegionScanner is an iterator through a bunch of rows in an HRegion. + *

+ * It is used to combine scanners from multiple Stores (aka column families). + */ +public class RegionScanner implements InternalScanner { +//Package local for testability + KeyValueHeap storeHeap = null; + private final byte [] stopRow; + private Filter filter; + private final int batch; + private int isScan; + private boolean filterClosed = false; + private long readPt; + private Scan originalScan; + private Future prefetchScanFuture = null; + private Map stores; + private KeyValue.KVComparator comparator; + private ConcurrentHashMap scannerReadPoints; + private MultiVersionConsistencyControl mvcc; + private AtomicBoolean closing; + private AtomicBoolean closed; + private HRegionInfo regionInfo; + private AtomicInteger rowReadCnt; + private final List MOCKED_LIST = HRegion.MOCKED_LIST; + + public RegionScanner(Scan scan, List additionalScanners, RegionContext regionContext) + throws IOException { + this.stores = regionContext.getStores(); + this.scannerReadPoints = regionContext.getScannerReadPoints(); + this.comparator = regionContext.getComparator(); + this.mvcc = regionContext.getmvcc(); + this.closing = regionContext.getClosing(); + this.closed = regionContext.getClosed(); + this.regionInfo = regionContext.getRegionInfo(); + this.rowReadCnt = regionContext.getRowReadCnt(); + this.originalScan = scan; + + this.filter = scan.getFilter(); + this.batch = scan.getBatch(); + if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { + this.stopRow = null; + } else { + this.stopRow = scan.getStopRow(); + } + // If we are doing a get, we want to be [startRow,endRow] normally + // it is [startRow,endRow) and if startRow=endRow we get nothing. + this.isScan = scan.isGetScan() ? -1 : 0; + + // synchronize on scannerReadPoints so that nobody calculates + // getSmallestReadPoint, before scannerReadPoints is updated. + synchronized(scannerReadPoints) { + this.readPt = MultiVersionConsistencyControl.resetThreadReadPoint(mvcc); + scannerReadPoints.put(this, this.readPt); + } + + List scanners = new ArrayList(); + if (additionalScanners != null) { + scanners.addAll(additionalScanners); + } + + for (Map.Entry> entry : + scan.getFamilyMap().entrySet()) { + Store store = stores.get(entry.getKey()); + StoreScanner scanner = store.getScanner(scan, entry.getValue()); + scanners.add(scanner); + } + this.storeHeap = new KeyValueHeap(scanners, comparator); + } + + /** + * Reset both the filter and the old filter. + */ + protected void resetFilters() { + if (filter != null) { + filter.reset(); + } + } + + @Override + public boolean next(List outResults, int limit) + throws IOException { + return next(outResults, limit, null); + } + + private void preCondition() throws IOException{ + if (this.filterClosed) { + throw new UnknownScannerException("Scanner was closed (timed out?) " + + "after we renewed it. Could be caused by a very slow scanner " + + "or a lengthy garbage collection"); + } + if (closing.get() || closed.get()) { + close(); + throw new NotServingRegionException(regionInfo.getRegionNameAsString() + + " is closing=" + closing.get() + " or closed=" + closed.get()); + } + + // This could be a new thread from the last time we called next(). + MultiVersionConsistencyControl.setThreadReadPoint(this.readPt); + } + + /** + * This class abstracts the results of a single scanner's result. It tracks + * the list of Result objects if the pre-fetch next was successful, and + * tracks the exception if the next failed. + */ + class ScanResult { + final boolean isException; + IOException ioException = null; + Result[] outResults; + boolean moreRows; + + public ScanResult(IOException ioException) { + isException = true; + this.ioException = ioException; + } + + public ScanResult(boolean moreRows, Result[] outResults) { + isException = false; + this.moreRows = moreRows; + this.outResults = outResults; + } + } + + /** + * This Callable abstracts calling a pre-fetch next. This is called on a + * threadpool. It makes a pre-fetch next call with the same parameters as + * the incoming next call. Note that the number of rows to return (nbRows) + * and/or the memory size for the result is the same as the previous call if + * pre-fetching is enabled. If these params change dynamically, they will + * take effect in the subsequent iteration. + */ + class ScanPrefetcher implements Callable { + int nbRows; + int limit; + String metric; + + ScanPrefetcher(int nbRows, int limit, String metric) { + this.nbRows = nbRows; + this.limit = limit; + this.metric = metric; + } + + public ScanResult call() { + ScanResult scanResult = null; + List outResults = new ArrayList(); + List tmpList = new ArrayList(); + int currentNbRows = 0; + boolean moreRows = true; + try { + // This is necessary b/c partialResponseSize is not serialized through + // RPC + getOriginalScan().setCurrentPartialResponseSize(0); + int maxResponseSize = getOriginalScan().getMaxResponseSize(); + do { + moreRows = nextInternal(tmpList, limit, metric); + if (!tmpList.isEmpty()) { + currentNbRows++; + if (outResults != null) { + outResults.add(new Result(tmpList)); + tmpList.clear(); + } + } + resetFilters(); + if (isFilterDone()) { + break; + } + + // While Condition + // 1. respect maxResponseSize and nbRows whichever comes first, + // 2. recheck the currentPartialResponseSize is to catch the case + // where maxResponseSize is saturated and partialRow == false + // since we allow this case valid in the nextInternal() layer + } while (moreRows + && (getOriginalScan().getCurrentPartialResponseSize() < + maxResponseSize && currentNbRows < nbRows)); + scanResult = new ScanResult(moreRows, + outResults.toArray(new Result[0])); + } catch (IOException e) { + // we should queue the exception as the result so that we can return + // this when the result is asked for + scanResult = new ScanResult(e); + } + return scanResult; + } + } + + /** + * A method to return all the rows that can fit in the response size. + * it respects the two stop conditions: + * 1) scan.getMaxResponseSize + * 2) scan.getCaching() (which is nbRows) + * the loop breaks whoever comes first. + * This is only used by scan(), not get() + * @param outResults a list of rows to return + * @param nbRows the number of rows that can be returned at most + * @param metric the metric name + * @return true if there are more rows to fetch. + * + * This is used by Scans. + */ + public synchronized Result[] nextRows(int nbRows, String metric) + throws IOException { + preCondition(); + boolean prefetchingEnabled = getOriginalScan().getServerPrefetching(); + int limit = this.getOriginalScan().getBatch(); + ScanResult scanResult; + // if we have a prefetched result, then use it + if (prefetchingEnabled && prefetchScanFuture != null) { + try { + scanResult = prefetchScanFuture.get(); + prefetchScanFuture = null; + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e); + } + if (scanResult.isException) { + throw scanResult.ioException; + } + } + // if there are no prefetched results, then preform the scan inline + else { + ScanPrefetcher scanFetch = new ScanPrefetcher(nbRows, limit, metric); + scanResult = scanFetch.call(); + } + + // schedule a background prefetch for the next result if prefetch is + // enabled on scans + boolean scanDone = + (scanResult.outResults == null || scanResult.outResults.length == 0); + if (prefetchingEnabled && !scanDone) { + ScanPrefetcher callable = new ScanPrefetcher(nbRows, limit, metric); + prefetchScanFuture = HRegionServer.scanPrefetchThreadPool.submit(callable); + } + rowReadCnt.addAndGet(scanResult.outResults.length); + return scanResult.outResults == null || + (isFilterDone() && scanResult.outResults.length == 0) ? + null : scanResult.outResults; + } + + /** + * This is used by Gets & unit tests, whereas nextRows() is + * used by Scans + */ + @Override + public synchronized boolean next(List outResults, int limit, + String metric) throws IOException { + preCondition(); + boolean returnResult; + if (outResults.isEmpty()) { + // Usually outResults is empty. This is true when next is called + // to handle scan or get operation. + returnResult = nextInternal(outResults, limit, metric); + } else { + List tmpList = new ArrayList(); + returnResult = nextInternal(tmpList, limit, metric); + outResults.addAll(tmpList); + } + rowReadCnt.incrementAndGet(); + resetFilters(); + if (isFilterDone()) { + return false; + } + return returnResult; + } + + @Override + public boolean next(List outResults) + throws IOException { + // apply the batching limit by default + return next(outResults, batch, null); + } + + @Override + public boolean next(List outResults, String metric) + throws IOException { + // apply the batching limit by default + return next(outResults, batch, metric); + } + + /* + * @return True if a filter rules the scanner is over, done. + */ + private boolean isFilterDone() { + return this.filter != null && this.filter.filterAllRemaining(); + } + + /** + * @param results empty list in which results will be stored + */ + private boolean nextInternal(List results, int limit, String metric) + throws IOException { + + if (!results.isEmpty()) { + throw new IllegalArgumentException("First parameter should be an empty list"); + } + + boolean partialRow = getOriginalScan().isPartialRow(); + long maxResponseSize = getOriginalScan().getMaxResponseSize(); + while (true) { + byte [] currentRow = peekRow(); + if (isStopRow(currentRow)) { + if (filter != null && filter.hasFilterRow()) { + filter.filterRow(results); + } + if (filter != null && filter.filterRow()) { + results.clear(); + } + + return false; + } else if (filterRowKey(currentRow)) { + nextRow(currentRow); + results.clear(); + } else { + byte [] nextRow; + do { + this.storeHeap.next(results, limit - results.size(), metric); + if (limit > 0 && results.size() == limit) { + if (this.filter != null && filter.hasFilterRow()) + throw new IncompatibleFilterException( + "Filter with filterRow(List) incompatible with scan with limit!"); + return true; // we are expecting more yes, but also limited to how many we can return. + } + // this gaurantees that we still complete the entire row if + // currentPartialResponseSize exceeds the maxResponseSize. + if (partialRow && getOriginalScan().getCurrentPartialResponseSize() + >= maxResponseSize) { + return true; + } + } while (Bytes.equals(currentRow, nextRow = peekRow())); + + final boolean stopRow = isStopRow(nextRow); + + // now that we have an entire row, lets process with a filters: + + // first filter with the filterRow(List) + if (filter != null && filter.hasFilterRow()) { + filter.filterRow(results); + } + + if (results.isEmpty() || filterRow()) { + nextRow(currentRow); + results.clear(); + + // This row was totally filtered out, if this is NOT the last row, + // we should continue on. + + if (!stopRow) continue; + } + return !stopRow; + } + } + } + + private boolean filterRow() { + return filter != null + && filter.filterRow(); + } + private boolean filterRowKey(byte[] row) { + return filter != null + && filter.filterRowKey(row, 0, row.length); + } + + protected void nextRow(byte [] currentRow) throws IOException { + while (Bytes.equals(currentRow, peekRow())) { + this.storeHeap.next(MOCKED_LIST); + } + resetFilters(); + } + + private byte[] peekRow() { + KeyValue kv = this.storeHeap.peek(); + return kv == null ? null : kv.getRow(); + } + + private boolean isStopRow(byte [] currentRow) { + return currentRow == null || + (stopRow != null && + comparator.compareRows(stopRow, 0, stopRow.length, + currentRow, 0, currentRow.length) <= isScan); + } + + @Override + public synchronized void close() { + if (storeHeap != null) { + storeHeap.close(); + storeHeap = null; + } + // no need to sychronize here. + scannerReadPoints.remove(this); + this.filterClosed = true; + } + + KeyValueHeap getStoreHeapForTesting() { + return storeHeap; + } + + /** + * Get the original scan object that was used to create this internal one + * @return original scan object... used for debug output + */ + public Scan getOriginalScan() { + return originalScan; + } +} Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1460199&r1=1460198&r2=1460199&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Sat Mar 23 18:18:39 2013 @@ -392,8 +392,8 @@ public class Store extends SchemaConfigu } // initialize the thread pool for opening store files in parallel.. ThreadPoolExecutor storeFileOpenerThreadPool = - this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" + - this.family.getNameAsString()); + StoreThreadUtils.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" + + this.family.getNameAsString(), this.getHRegionInfo(), this.conf); CompletionService completionService = new ExecutorCompletionService(storeFileOpenerThreadPool); @@ -587,9 +587,9 @@ public class Store extends SchemaConfigu storefiles = ImmutableList.of(); if (!result.isEmpty()) { // initialize the thread pool for closing store files in parallel. - ThreadPoolExecutor storeFileCloserThreadPool = this.region - .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-" - + this.family.getNameAsString()); + ThreadPoolExecutor storeFileCloserThreadPool = + StoreThreadUtils.getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-" + + this.family.getNameAsString(), this.getHRegionInfo(), this.conf); // close each store file in parallel CompletionService completionService = Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreThreadUtils.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreThreadUtils.java?rev=1460199&view=auto ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreThreadUtils.java (added) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreThreadUtils.java Sat Mar 23 18:18:39 2013 @@ -0,0 +1,70 @@ +/* + * Copyright 2010 The Apache Software Foundation + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.util.Threads; + +/** + * Thread utilities which are used while creating a thread pool to + * load store files + */ +public class StoreThreadUtils { + public static ThreadPoolExecutor getStoreOpenAndCloseThreadPool( + final String threadNamePrefix, HRegionInfo regionInfo, Configuration conf) { + int numStores = Math.max(1, regionInfo.getTableDesc().families.size()); + int maxThreads = Math.min(numStores, + conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX, + HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)); + return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix); + } + + public static ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool( + final String threadNamePrefix, HRegionInfo regionInfo, Configuration conf) { + int numStores = Math.max(1, regionInfo.getTableDesc().families.size()); + int maxThreads = Math.max(1, + conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX, + HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX) + / numStores); + return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix); + } + + private static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads, + final String threadNamePrefix) { + ThreadPoolExecutor openAndCloseThreadPool = Threads + .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, + new ThreadFactory() { + private int count = 1; + + public Thread newThread(Runnable r) { + Thread t = new Thread(r, threadNamePrefix + "-" + count++); + t.setDaemon(true); + return t; + } + }); + return openAndCloseThreadPool; + } +} Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1460199&r1=1460198&r2=1460199&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Sat Mar 23 18:18:39 2013 @@ -58,7 +58,6 @@ import org.apache.hadoop.hbase.filter.Fi import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; -import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner; import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.regionserver.wal.HLog; @@ -1417,7 +1416,7 @@ public class TestHRegion extends HBaseTe region.put(put); Scan scan = null; - HRegion.RegionScanner is = null; + RegionScanner is = null; //Testing to see how many scanners that is produced by getScanner, starting //with known number, 2 - current = 1 Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java?rev=1460199&r1=1460198&r2=1460199&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java Sat Mar 23 18:18:39 2013 @@ -97,7 +97,7 @@ public class TestScanWithBloomError { LOG.info("Scanning column set: " + Arrays.toString(colSet)); Scan scan = new Scan(ROW_BYTES, ROW_BYTES); addColumnSetToScan(scan, colSet); - HRegion.RegionScanner scanner = (HRegion.RegionScanner) + RegionScanner scanner = (RegionScanner) region.getScanner(scan); KeyValueHeap storeHeap = scanner.getStoreHeapForTesting(); assertEquals(0, storeHeap.getHeap().size()); Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java?rev=1460199&r1=1460198&r2=1460199&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestWideScanner.java Sat Mar 23 18:18:39 2013 @@ -134,7 +134,7 @@ public class TestWideScanner extends HBa // trigger ChangedReadersObservers Iterator scanners = - ((HRegion.RegionScanner)s).storeHeap.getHeap().iterator(); + ((RegionScanner)s).storeHeap.getHeap().iterator(); while (scanners.hasNext()) { StoreScanner ss = (StoreScanner)scanners.next(); ss.updateReaders();