Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CADC2E350 for ; Thu, 21 Feb 2013 03:07:05 +0000 (UTC) Received: (qmail 88558 invoked by uid 500); 21 Feb 2013 03:07:05 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 88522 invoked by uid 500); 21 Feb 2013 03:07:05 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 88511 invoked by uid 99); 21 Feb 2013 03:07:05 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Feb 2013 03:07:05 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Feb 2013 03:06:55 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 4564D23888D2; Thu, 21 Feb 2013 03:06:34 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1448497 - in /hbase/branches/hbase-7290v2: ./ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/... Date: Thu, 21 Feb 2013 03:06:33 -0000 To: commits@hbase.apache.org From: jmhsieh@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130221030634.4564D23888D2@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jmhsieh Date: Thu Feb 21 03:06:32 2013 New Revision: 1448497 URL: http://svn.apache.org/r1448497 Log: Pre-trunk-merging sync of trunk into snapshots branch Modified: hbase/branches/hbase-7290v2/ (props changed) hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java Propchange: hbase/branches/hbase-7290v2/ ------------------------------------------------------------------------------ Merged /hbase/trunk:r1448316-1448496 Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java?rev=1448497&r1=1448496&r2=1448497&view=diff ============================================================================== --- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (original) +++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java Thu Feb 21 03:06:32 2013 @@ -41,6 +41,8 @@ import org.apache.hadoop.hbase.regionser import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Pair; @@ -135,28 +137,63 @@ public abstract class BaseRegionObserver final Store store, final List candidates) throws IOException { } @Override + public void preCompactSelection(final ObserverContext c, + final Store store, final List candidates, final CompactionRequest request) + throws IOException { + preCompactSelection(c, store, candidates); + } + + @Override public void postCompactSelection(final ObserverContext c, final Store store, final ImmutableList selected) { } @Override + public void postCompactSelection(final ObserverContext c, + final Store store, final ImmutableList selected, CompactionRequest request) { + postCompactSelection(c, store, selected); + } + + @Override public InternalScanner preCompact(ObserverContext e, final Store store, final InternalScanner scanner, final ScanType scanType) - throws IOException { + throws IOException { return scanner; } @Override - public InternalScanner preCompactScannerOpen(final ObserverContext c, - final Store store, List scanners, final ScanType scanType, - final long earliestPutTs, final InternalScanner s) throws IOException { + public InternalScanner preCompact(ObserverContext e, + final Store store, final InternalScanner scanner, final ScanType scanType, + CompactionRequest request) throws IOException { + return preCompact(e, store, scanner, scanType); + } + + @Override + public InternalScanner preCompactScannerOpen( + final ObserverContext c, final Store store, + List scanners, final ScanType scanType, final long earliestPutTs, + final InternalScanner s) throws IOException { return null; } @Override + public InternalScanner preCompactScannerOpen( + final ObserverContext c, final Store store, + List scanners, final ScanType scanType, final long earliestPutTs, + final InternalScanner s, CompactionRequest request) throws IOException { + return preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s); + } + + @Override public void postCompact(ObserverContext e, final Store store, final StoreFile resultFile) throws IOException { } +@Override + public void postCompact(ObserverContext e, final Store store, + final StoreFile resultFile, CompactionRequest request) throws IOException { + postCompact(e, store, resultFile); + } + @Override public void preGetClosestRowBefore(final ObserverContext e, final byte [] row, final byte [] family, final Result result) @@ -351,4 +388,4 @@ public abstract class BaseRegionObserver List> familyPaths, boolean hasLoaded) throws IOException { return hasLoaded; } -} +} \ No newline at end of file Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java?rev=1448497&r1=1448496&r2=1448497&view=diff ============================================================================== --- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (original) +++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java Thu Feb 21 03:06:32 2013 @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.regionser import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -126,87 +127,184 @@ public interface RegionObserver extends final StoreFile resultFile) throws IOException; /** - * Called prior to selecting the {@link StoreFile}s to compact from the list - * of available candidates. To alter the files used for compaction, you may - * mutate the passed in list of candidates. + * Called prior to selecting the {@link StoreFile StoreFiles} to compact from the list of + * available candidates. To alter the files used for compaction, you may mutate the passed in list + * of candidates. * @param c the environment provided by the region server * @param store the store where compaction is being requested * @param candidates the store files currently available for compaction + * @param request custom compaction request * @throws IOException if an error occurred on the coprocessor */ void preCompactSelection(final ObserverContext c, + final Store store, final List candidates, final CompactionRequest request) + throws IOException; + + /** + * Called prior to selecting the {@link StoreFile}s to compact from the list of available + * candidates. To alter the files used for compaction, you may mutate the passed in list of + * candidates. + * @param c the environment provided by the region server + * @param store the store where compaction is being requested + * @param candidates the store files currently available for compaction + * @throws IOException if an error occurred on the coprocessor + * @deprecated Use {@link #preCompactSelection(ObserverContext, Store, List, Object)} instead + */ + void preCompactSelection(final ObserverContext c, final Store store, final List candidates) throws IOException; /** - * Called after the {@link StoreFile}s to compact have been selected from the - * available candidates. + * Called after the {@link StoreFile}s to compact have been selected from the available + * candidates. + * @param c the environment provided by the region server + * @param store the store being compacted + * @param selected the store files selected to compact + * @param request custom compaction request + */ + void postCompactSelection(final ObserverContext c, + final Store store, final ImmutableList selected, CompactionRequest request); + + /** + * Called after the {@link StoreFile}s to compact have been selected from the available + * candidates. * @param c the environment provided by the region server * @param store the store being compacted * @param selected the store files selected to compact + * @param compactionAttributes custom attributes for the compaction + * @deprecated use {@link #postCompactSelection(ObserverContext, Store, ImmutableList, Object)} + * instead. */ + @Deprecated void postCompactSelection(final ObserverContext c, final Store store, final ImmutableList selected); /** - * Called prior to writing the {@link StoreFile}s selected for compaction into - * a new {@code StoreFile}. To override or modify the compaction process, - * implementing classes have two options: + * Called prior to writing the {@link StoreFile}s selected for compaction into a new + * {@code StoreFile}. To override or modify the compaction process, implementing classes have two + * options: *
    - *
  • Wrap the provided {@link InternalScanner} with a custom - * implementation that is returned from this method. The custom scanner - * can then inspect {@link KeyValue}s from the wrapped scanner, applying - * its own policy to what gets written.
  • - *
  • Call {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} - * and provide a custom implementation for writing of new - * {@link StoreFile}s. Note: any implementations bypassing - * core compaction using this approach must write out new store files - * themselves or the existing data will no longer be available after - * compaction.
  • + *
  • Wrap the provided {@link InternalScanner} with a custom implementation that is returned + * from this method. The custom scanner can then inspect {@link KeyValue}s from the wrapped + * scanner, applying its own policy to what gets written.
  • + *
  • Call {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} and provide a + * custom implementation for writing of new {@link StoreFile}s. Note: any implementations + * bypassing core compaction using this approach must write out new store files themselves or the + * existing data will no longer be available after compaction.
  • *
* @param c the environment provided by the region server * @param store the store being compacted - * @param scanner the scanner over existing data used in the store file - * rewriting + * @param scanner the scanner over existing data used in the store file rewriting * @param scanType type of Scan - * @return the scanner to use during compaction. Should not be {@code null} - * unless the implementation is writing new store files on its own. + * @param request the requested compaction + * @return the scanner to use during compaction. Should not be {@code null} unless the + * implementation is writing new store files on its own. * @throws IOException if an error occurred on the coprocessor */ InternalScanner preCompact(final ObserverContext c, - final Store store, final InternalScanner scanner, - final ScanType scanType) throws IOException; + final Store store, final InternalScanner scanner, final ScanType scanType, + CompactionRequest request) throws IOException; /** - * Called prior to writing the {@link StoreFile}s selected for compaction into - * a new {@code StoreFile} and prior to creating the scanner used to read the - * input files. To override or modify the compaction process, - * implementing classes can return a new scanner to provide the KeyValues to be - * stored into the new {@code StoreFile} or null to perform the default processing. - * Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no + * Called prior to writing the {@link StoreFile}s selected for compaction into a new + * {@code StoreFile}. To override or modify the compaction process, implementing classes have two + * options: + *
    + *
  • Wrap the provided {@link InternalScanner} with a custom implementation that is returned + * from this method. The custom scanner can then inspect {@link KeyValue}s from the wrapped + * scanner, applying its own policy to what gets written.
  • + *
  • Call {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} and provide a + * custom implementation for writing of new {@link StoreFile}s. Note: any implementations + * bypassing core compaction using this approach must write out new store files themselves or the + * existing data will no longer be available after compaction.
  • + *
+ * @param c the environment provided by the region server + * @param store the store being compacted + * @param scanner the scanner over existing data used in the store file rewriting + * @param scanType type of Scan + * @param request the requested compaction + * @return the scanner to use during compaction. Should not be {@code null} unless the + * implementation is writing new store files on its own. + * @throws IOException if an error occurred on the coprocessor + * @deprecated use + * {@link #preCompact(ObserverContext, Store, InternalScanner, ScanType, CompactionRequest)} + * instead + */ + @Deprecated + InternalScanner preCompact(final ObserverContext c, + final Store store, final InternalScanner scanner, final ScanType scanType) throws IOException; + + /** + * Called prior to writing the {@link StoreFile}s selected for compaction into a new + * {@code StoreFile} and prior to creating the scanner used to read the input files. To override + * or modify the compaction process, implementing classes can return a new scanner to provide the + * KeyValues to be stored into the new {@code StoreFile} or null to perform the default + * processing. Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no * effect in this hook. * @param c the environment provided by the region server * @param store the store being compacted * @param scanners the list {@link StoreFileScanner}s to be read from * @param scanType the {@link ScanType} indicating whether this is a major or minor compaction - * @param earliestPutTs timestamp of the earliest put that was found in any of the involved - * store files + * @param earliestPutTs timestamp of the earliest put that was found in any of the involved store + * files * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain - * @return the scanner to use during compaction. {@code null} if the default implementation - * is to be used. + * @param request the requested compaction + * @return the scanner to use during compaction. {@code null} if the default implementation is to + * be used. * @throws IOException if an error occurred on the coprocessor */ InternalScanner preCompactScannerOpen(final ObserverContext c, final Store store, List scanners, final ScanType scanType, + final long earliestPutTs, final InternalScanner s, CompactionRequest request) + throws IOException; + + /** + * Called prior to writing the {@link StoreFile}s selected for compaction into a new + * {@code StoreFile} and prior to creating the scanner used to read the input files. To override + * or modify the compaction process, implementing classes can return a new scanner to provide the + * KeyValues to be stored into the new {@code StoreFile} or null to perform the default + * processing. Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no + * effect in this hook. + * @param c the environment provided by the region server + * @param store the store being compacted + * @param scanners the list {@link StoreFileScanner}s to be read from + * @param scanType the {@link ScanType} indicating whether this is a major or minor compaction + * @param earliestPutTs timestamp of the earliest put that was found in any of the involved store + * files + * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain + * @param request the requested compaction + * @return the scanner to use during compaction. {@code null} if the default implementation is to + * be used. + * @throws IOException if an error occurred on the coprocessor + * @deprecated Use + * {@link #preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner, CompactionRequest)} + * instead. + */ + @Deprecated + InternalScanner preCompactScannerOpen(final ObserverContext c, + final Store store, List scanners, final ScanType scanType, final long earliestPutTs, final InternalScanner s) throws IOException; /** - * Called after compaction has completed and the new store file has been - * moved in to place. + * Called after compaction has completed and the new store file has been moved in to place. + * @param c the environment provided by the region server + * @param store the store being compacted + * @param resultFile the new store file written out during compaction + * @param request the requested compaction + * @throws IOException if an error occurred on the coprocessor + */ + void postCompact(final ObserverContext c, final Store store, + StoreFile resultFile, CompactionRequest request) throws IOException; + + /** + * Called after compaction has completed and the new store file has been moved in to place. * @param c the environment provided by the region server * @param store the store being compacted * @param resultFile the new store file written out during compaction * @throws IOException if an error occurred on the coprocessor + * @deprecated Use {@link #postCompact(ObserverContext, Store, StoreFile, CompactionRequest)} + * instead */ + @Deprecated void postCompact(final ObserverContext c, final Store store, StoreFile resultFile) throws IOException; Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=1448497&r1=1448496&r2=1448497&view=diff ============================================================================== --- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (original) +++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java Thu Feb 21 03:06:32 2013 @@ -19,7 +19,9 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.PriorityBlockingQueue; @@ -183,23 +185,41 @@ public class CompactSplitThread implemen } } - public synchronized void requestCompaction(final HRegion r, - final String why) throws IOException { - for (Store s : r.getStores().values()) { - requestCompaction(r, s, why, Store.NO_PRIORITY); - } + @Override + public synchronized List requestCompaction(final HRegion r, final String why) + throws IOException { + return requestCompaction(r, why, null); + } + + @Override + public synchronized List requestCompaction(final HRegion r, final String why, + List requests) throws IOException { + return requestCompaction(r, why, Store.NO_PRIORITY, requests); } - public synchronized void requestCompaction(final HRegion r, final Store s, - final String why) throws IOException { - requestCompaction(r, s, why, Store.NO_PRIORITY); + @Override + public synchronized CompactionRequest requestCompaction(final HRegion r, final Store s, + final String why, CompactionRequest request) throws IOException { + return requestCompaction(r, s, why, Store.NO_PRIORITY, request); } - public synchronized void requestCompaction(final HRegion r, final String why, - int p) throws IOException { - for (Store s : r.getStores().values()) { - requestCompaction(r, s, why, p); + @Override + public synchronized List requestCompaction(final HRegion r, final String why, + int p, List requests) throws IOException { + // not a special compaction request, so make our own list + List ret; + if (requests == null) { + ret = new ArrayList(r.getStores().size()); + for (Store s : r.getStores().values()) { + ret.add(requestCompaction(r, s, why, p, null)); + } + } else { + ret = new ArrayList(requests.size()); + for (CompactionRequest request : requests) { + requests.add(requestCompaction(r, request.getStore(), why, p, request)); + } } + return ret; } /** @@ -207,13 +227,15 @@ public class CompactSplitThread implemen * @param s Store to request compaction on * @param why Why compaction requested -- used in debug messages * @param priority override the default priority (NO_PRIORITY == decide) + * @param request custom compaction request. Can be null in which case a simple + * compaction will be used. */ - public synchronized void requestCompaction(final HRegion r, final Store s, - final String why, int priority) throws IOException { + public synchronized CompactionRequest requestCompaction(final HRegion r, final Store s, + final String why, int priority, CompactionRequest request) throws IOException { if (this.server.isStopped()) { - return; + return null; } - CompactionRequest cr = s.requestCompaction(priority); + CompactionRequest cr = s.requestCompaction(priority, request); if (cr != null) { cr.setServer(server); if (priority != Store.NO_PRIORITY) { @@ -234,6 +256,7 @@ public class CompactSplitThread implemen " because compaction request was cancelled"); } } + return cr; } /** Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java?rev=1448497&r1=1448496&r2=1448497&view=diff ============================================================================== --- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java (original) +++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java Thu Feb 21 03:06:32 2013 @@ -19,42 +19,73 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.List; + import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; @InterfaceAudience.Private public interface CompactionRequestor { /** * @param r Region to compact * @param why Why compaction was requested -- used in debug messages + * @return The created {@link CompactionRequest CompactionRequests} or an empty list if no + * compactions were started * @throws IOException */ - public void requestCompaction(final HRegion r, final String why) throws IOException; + public List requestCompaction(final HRegion r, final String why) + throws IOException; /** * @param r Region to compact - * @param s Store within region to compact * @param why Why compaction was requested -- used in debug messages + * @param requests custom compaction requests. Each compaction must specify the store on which it + * is acting. Can be null in which case a compaction will be attempted on all + * stores for the region. + * @return The created {@link CompactionRequest CompactionRequests} or an empty list if no + * compactions were started * @throws IOException */ - public void requestCompaction(final HRegion r, final Store s, final String why) + public List requestCompaction(final HRegion r, final String why, + List requests) throws IOException; /** * @param r Region to compact + * @param s Store within region to compact + * @param why Why compaction was requested -- used in debug messages + * @param request custom compaction request for the {@link HRegion} and {@link Store}. Custom + * request must be null or be constructed with matching region and store. + * @return The created {@link CompactionRequest} or null if no compaction was started. + * @throws IOException + */ + public CompactionRequest requestCompaction(final HRegion r, final Store s, final String why, + CompactionRequest request) throws IOException; + + /** + * @param r Region to compact * @param why Why compaction was requested -- used in debug messages * @param pri Priority of this compaction. minHeap. <=0 is critical + * @param requests custom compaction requests. Each compaction must specify the store on which it + * is acting. Can be null in which case a compaction will be attempted on all + * stores for the region. + * @return The created {@link CompactionRequest CompactionRequests} or an empty list if no + * compactions were started. * @throws IOException */ - public void requestCompaction(final HRegion r, final String why, int pri) throws IOException; + public List requestCompaction(final HRegion r, final String why, int pri, + List requests) throws IOException; /** * @param r Region to compact * @param s Store within region to compact * @param why Why compaction was requested -- used in debug messages * @param pri Priority of this compaction. minHeap. <=0 is critical + * @param request custom compaction request to run. {@link Store} and {@link HRegion} for the + * request must match the region and store specified here. + * @return The created {@link CompactionRequest} or null if no compaction was started * @throws IOException */ - public void requestCompaction(final HRegion r, final Store s, - final String why, int pri) throws IOException; - + public CompactionRequest requestCompaction(final HRegion r, final Store s, final String why, + int pri, CompactionRequest request) throws IOException; } Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1448497&r1=1448496&r2=1448497&view=diff ============================================================================== --- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original) +++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Thu Feb 21 03:06:32 2013 @@ -1375,17 +1375,17 @@ public class HRegionServer implements Cl try { if (s.needsCompaction()) { // Queue a compaction. Will recognize if major is needed. - this.instance.compactSplitThread.requestCompaction(r, s, - getName() + " requests compaction"); + this.instance.compactSplitThread.requestCompaction(r, s, getName() + + " requests compaction", null); } else if (s.isMajorCompaction()) { - if (majorCompactPriority == DEFAULT_PRIORITY || - majorCompactPriority > r.getCompactPriority()) { - this.instance.compactSplitThread.requestCompaction(r, s, - getName() + " requests major compaction; use default priority"); + if (majorCompactPriority == DEFAULT_PRIORITY + || majorCompactPriority > r.getCompactPriority()) { + this.instance.compactSplitThread.requestCompaction(r, s, getName() + + " requests major compaction; use default priority", null); } else { - this.instance.compactSplitThread.requestCompaction(r, s, - getName() + " requests major compaction; use configured priority", - this.majorCompactPriority); + this.instance.compactSplitThread.requestCompaction(r, s, getName() + + " requests major compaction; use configured priority", + this.majorCompactPriority, null); } } } catch (IOException e) { @@ -1692,7 +1692,7 @@ public class HRegionServer implements Cl // Do checks to see if we need to compact (references or too many files) for (Store s : r.getStores().values()) { if (s.hasReferences() || s.needsCompaction()) { - getCompactionRequester().requestCompaction(r, s, "Opening Region"); + getCompactionRequester().requestCompaction(r, s, "Opening Region", null); } } long openSeqNum = r.getOpenSeqNum(); @@ -3657,10 +3657,10 @@ public class HRegionServer implements Cl String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg; if(family != null) { compactSplitThread.requestCompaction(region, store, log, - Store.PRIORITY_USER); + Store.PRIORITY_USER, null); } else { compactSplitThread.requestCompaction(region, log, - Store.PRIORITY_USER); + Store.PRIORITY_USER, null); } return CompactRegionResponse.newBuilder().build(); } catch (IOException ie) { @@ -4062,4 +4062,11 @@ public class HRegionServer implements Cl String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC); return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation); } + + /** + * @return the underlying {@link CompactSplitThread} for the servers + */ + public CompactSplitThread getCompactSplitThread() { + return this.compactSplitThread; + } } Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1448497&r1=1448496&r2=1448497&view=diff ============================================================================== --- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original) +++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Thu Feb 21 03:06:32 2013 @@ -1090,14 +1090,13 @@ public class HStore implements Store { List sfs = new ArrayList(); long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis(); try { - List newFiles = - this.compactor.compact(filesToCompact, cr.isMajor()); + List newFiles = this.compactor.compact(cr); // Move the compaction into place. if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) { for (Path newFile: newFiles) { StoreFile sf = completeCompaction(filesToCompact, newFile); if (region.getCoprocessorHost() != null) { - region.getCoprocessorHost().postCompact(this, sf); + region.getCoprocessorHost().postCompact(this, sf, cr); } sfs.add(sf); } @@ -1181,13 +1180,12 @@ public class HStore implements Store { try { // Ready to go. Have list of files to compact. - List newFiles = - this.compactor.compact(filesToCompact, isMajor); + List newFiles = this.compactor.compactForTesting(filesToCompact, isMajor); for (Path newFile: newFiles) { // Move the compaction into place. StoreFile sf = completeCompaction(filesToCompact, newFile); if (region.getCoprocessorHost() != null) { - region.getCoprocessorHost().postCompact(this, sf); + region.getCoprocessorHost().postCompact(this, sf, null); } } } finally { @@ -1219,17 +1217,19 @@ public class HStore implements Store { return compactionPolicy.isMajorCompaction(this.storeFileManager.getStorefiles()); } + @Override public CompactionRequest requestCompaction() throws IOException { - return requestCompaction(Store.NO_PRIORITY); + return requestCompaction(Store.NO_PRIORITY, null); } - public CompactionRequest requestCompaction(int priority) throws IOException { + @Override + public CompactionRequest requestCompaction(int priority, CompactionRequest request) + throws IOException { // don't even select for compaction if writes are disabled if (!this.region.areWritesEnabled()) { return null; } - CompactionRequest ret = null; this.lock.readLock().lock(); try { List candidates = Lists.newArrayList(storeFileManager.getStorefiles()); @@ -1238,7 +1238,7 @@ public class HStore implements Store { candidates = compactionPolicy.preSelectCompaction(candidates, filesCompacting); boolean override = false; if (region.getCoprocessorHost() != null) { - override = region.getCoprocessorHost().preCompactSelection(this, candidates); + override = region.getCoprocessorHost().preCompactSelection(this, candidates, request); } CompactSelection filesToCompact; if (override) { @@ -1257,7 +1257,7 @@ public class HStore implements Store { if (region.getCoprocessorHost() != null) { region.getCoprocessorHost().postCompactSelection(this, - ImmutableList.copyOf(filesToCompact.getFilesToCompact())); + ImmutableList.copyOf(filesToCompact.getFilesToCompact()), request); } // no files to compact @@ -1287,15 +1287,24 @@ public class HStore implements Store { // everything went better than expected. create a compaction request int pri = getCompactPriority(priority); - ret = new CompactionRequest(region, this, filesToCompact, isMajor, pri); + //not a special compaction request, so we need to make one + if(request == null){ + request = new CompactionRequest(region, this, filesToCompact, isMajor, pri); + }else{ + //update the request with what the system thinks the request should be + //its up to the request if it wants to listen + request.setSelection(filesToCompact); + request.setIsMajor(isMajor); + request.setPriority(pri); + } } } finally { this.lock.readLock().unlock(); } - if (ret != null) { - this.region.reportCompactionRequestStart(ret.isMajor()); + if (request != null) { + this.region.reportCompactionRequestStart(request.isMajor()); } - return ret; + return request; } public void finishRequest(CompactionRequest cr) { Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1448497&r1=1448496&r2=1448497&view=diff ============================================================================== --- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original) +++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Thu Feb 21 03:06:32 2013 @@ -586,7 +586,9 @@ public class MemStore implements HeapSiz // which means we can prove that no scanner will see this version // false means there was a change, so give us the size. - addedSize -= heapSizeChange(cur, true); + long delta = heapSizeChange(cur, true); + addedSize -= delta; + this.size.addAndGet(-delta); it.remove(); } else { versionsVisible++; Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java?rev=1448497&r1=1448496&r2=1448497&view=diff ============================================================================== --- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (original) +++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java Thu Feb 21 03:06:32 2013 @@ -55,6 +55,8 @@ import org.apache.hadoop.hbase.coprocess import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; @@ -345,11 +347,10 @@ public class RegionCoprocessorHost /** * See - * {@link RegionObserver#preCompactScannerOpen(ObserverContext, - * Store, List, ScanType, long, InternalScanner)} + * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner, CompactionRequest)} */ public InternalScanner preCompactScannerOpen(Store store, List scanners, - ScanType scanType, long earliestPutTs) throws IOException { + ScanType scanType, long earliestPutTs, CompactionRequest request) throws IOException { ObserverContext ctx = null; InternalScanner s = null; for (RegionEnvironment env: coprocessors) { @@ -357,7 +358,7 @@ public class RegionCoprocessorHost ctx = ObserverContext.createAndPrepare(env, ctx); try { s = ((RegionObserver) env.getInstance()).preCompactScannerOpen(ctx, store, scanners, - scanType, earliestPutTs, s); + scanType, earliestPutTs, s, request); } catch (Throwable e) { handleCoprocessorThrowable(env,e); } @@ -370,22 +371,23 @@ public class RegionCoprocessorHost } /** - * Called prior to selecting the {@link StoreFile}s for compaction from - * the list of currently available candidates. + * Called prior to selecting the {@link StoreFile}s for compaction from the list of currently + * available candidates. * @param store The store where compaction is being requested * @param candidates The currently available store files + * @param request custom compaction request * @return If {@code true}, skip the normal selection process and use the current list * @throws IOException */ - public boolean preCompactSelection(Store store, List candidates) throws IOException { + public boolean preCompactSelection(Store store, List candidates, + CompactionRequest request) throws IOException { ObserverContext ctx = null; boolean bypass = false; for (RegionEnvironment env: coprocessors) { if (env.getInstance() instanceof RegionObserver) { ctx = ObserverContext.createAndPrepare(env, ctx); try { - ((RegionObserver)env.getInstance()).preCompactSelection( - ctx, store, candidates); + ((RegionObserver) env.getInstance()).preCompactSelection(ctx, store, candidates, request); } catch (Throwable e) { handleCoprocessorThrowable(env,e); @@ -400,20 +402,20 @@ public class RegionCoprocessorHost } /** - * Called after the {@link StoreFile}s to be compacted have been selected - * from the available candidates. + * Called after the {@link StoreFile}s to be compacted have been selected from the available + * candidates. * @param store The store where compaction is being requested * @param selected The store files selected to compact + * @param request custom compaction */ - public void postCompactSelection(Store store, - ImmutableList selected) { + public void postCompactSelection(Store store, ImmutableList selected, + CompactionRequest request) { ObserverContext ctx = null; for (RegionEnvironment env: coprocessors) { if (env.getInstance() instanceof RegionObserver) { ctx = ObserverContext.createAndPrepare(env, ctx); try { - ((RegionObserver)env.getInstance()).postCompactSelection( - ctx, store, selected); + ((RegionObserver) env.getInstance()).postCompactSelection(ctx, store, selected, request); } catch (Throwable e) { handleCoprocessorThrowableNoRethrow(env,e); } @@ -429,18 +431,19 @@ public class RegionCoprocessorHost * @param store the store being compacted * @param scanner the scanner used to read store data during compaction * @param scanType type of Scan + * @param request the compaction that will be executed * @throws IOException */ - public InternalScanner preCompact(Store store, InternalScanner scanner, - ScanType scanType) throws IOException { + public InternalScanner preCompact(Store store, InternalScanner scanner, ScanType scanType, + CompactionRequest request) throws IOException { ObserverContext ctx = null; boolean bypass = false; for (RegionEnvironment env: coprocessors) { if (env.getInstance() instanceof RegionObserver) { ctx = ObserverContext.createAndPrepare(env, ctx); try { - scanner = ((RegionObserver)env.getInstance()).preCompact( - ctx, store, scanner, scanType); + scanner = ((RegionObserver) env.getInstance()).preCompact(ctx, store, scanner, scanType, + request); } catch (Throwable e) { handleCoprocessorThrowable(env,e); } @@ -457,15 +460,17 @@ public class RegionCoprocessorHost * Called after the store compaction has completed. * @param store the store being compacted * @param resultFile the new store file written during compaction + * @param request the compaction that is being executed * @throws IOException */ - public void postCompact(Store store, StoreFile resultFile) throws IOException { + public void postCompact(Store store, StoreFile resultFile, CompactionRequest request) + throws IOException { ObserverContext ctx = null; for (RegionEnvironment env: coprocessors) { if (env.getInstance() instanceof RegionObserver) { ctx = ObserverContext.createAndPrepare(env, ctx); try { - ((RegionObserver)env.getInstance()).postCompact(ctx, store, resultFile); + ((RegionObserver) env.getInstance()).postCompact(ctx, store, resultFile, request); } catch (Throwable e) { handleCoprocessorThrowable(env, e); } Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1448497&r1=1448496&r2=1448497&view=diff ============================================================================== --- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original) +++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Thu Feb 21 03:06:32 2013 @@ -160,7 +160,8 @@ public interface Store extends HeapSize, public CompactionRequest requestCompaction() throws IOException; - public CompactionRequest requestCompaction(int priority) throws IOException; + public CompactionRequest requestCompaction(int priority, CompactionRequest request) + throws IOException; public void finishRequest(CompactionRequest cr); Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java?rev=1448497&r1=1448496&r2=1448497&view=diff ============================================================================== --- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java (original) +++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java Thu Feb 21 03:06:32 2013 @@ -19,20 +19,22 @@ package org.apache.hadoop.hbase.regionserver.compactions; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hbase.RemoteExceptionHandler; -import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.util.StringUtils; @@ -46,35 +48,51 @@ import com.google.common.collect.Collect /** * This class holds all details necessary to run a compaction. */ -@InterfaceAudience.Private +@InterfaceAudience.LimitedPrivate({ "coprocessor" }) +@InterfaceStability.Evolving public class CompactionRequest implements Comparable, Runnable { static final Log LOG = LogFactory.getLog(CompactionRequest.class); private final HRegion region; private final HStore store; - private final CompactSelection compactSelection; - private final long totalSize; - private final boolean isMajor; + private CompactSelection compactSelection; + private long totalSize; + private boolean isMajor; private int priority; private final Long timeInNanos; private HRegionServer server = null; - public CompactionRequest(HRegion region, HStore store, - CompactSelection files, boolean isMajor, int priority) { - Preconditions.checkNotNull(region); - Preconditions.checkNotNull(files); + public static CompactionRequest getRequestForTesting(Collection selection, + boolean isMajor) { + return new CompactionRequest(null, null, new CompactSelection(new ArrayList( + selection)), isMajor, 0, System.nanoTime()); + } + + /** + * Constructor for a custom compaction. Uses the setXXX methods to update the state of the + * compaction before being used. + */ + public CompactionRequest(HRegion region, HStore store, int priority) { + this(region, store, null, false, priority, System + .nanoTime()); + } + + public CompactionRequest(HRegion r, HStore s, CompactSelection files, boolean isMajor, int p) { + // delegate to the internal constructor after checking basic preconditions + this(Preconditions.checkNotNull(r), s, Preconditions.checkNotNull(files), isMajor, p, System + .nanoTime()); + } + private CompactionRequest(HRegion region, HStore store, CompactSelection files, boolean isMajor, + int priority, long startTime) { this.region = region; this.store = store; - this.compactSelection = files; - long sz = 0; - for (StoreFile sf : files.getFilesToCompact()) { - sz += sf.getReader().length(); - } - this.totalSize = sz; this.isMajor = isMajor; this.priority = priority; - this.timeInNanos = System.nanoTime(); + this.timeInNanos = startTime; + if (files != null) { + this.setSelection(files); + } } /** @@ -162,6 +180,28 @@ public class CompactionRequest implement this.server = hrs; } + /** + * Set the files (and, implicitly, the size of the compaction based on those files) + * @param files files that should be included in the compaction + */ + public void setSelection(CompactSelection files) { + long sz = 0; + for (StoreFile sf : files.getFilesToCompact()) { + sz += sf.getReader().length(); + } + this.totalSize = sz; + this.compactSelection = files; + } + + /** + * Specify if this compaction should be a major compaction based on the state of the store + * @param isMajor true if the system determines that this compaction should be a major + * compaction + */ + public void setIsMajor(boolean isMajor) { + this.isMajor = isMajor; + } + @Override public String toString() { String fsList = Joiner.on(", ").join( @@ -200,12 +240,11 @@ public class CompactionRequest implement if (completed) { // degenerate case: blocked regions require recursive enqueues if (store.getCompactPriority() <= 0) { - server.compactSplitThread - .requestCompaction(region, store, "Recursive enqueue"); - } else { - // see if the compaction has caused us to exceed max region size - server.compactSplitThread.requestSplit(region); - } + server.compactSplitThread.requestCompaction(region, store, "Recursive enqueue", null); + } else { + // see if the compaction has caused us to exceed max region size + server.getCompactSplitThread().requestSplit(region); + } } } catch (IOException ex) { LOG.error("Compaction failed " + this, RemoteExceptionHandler @@ -234,4 +273,4 @@ public class CompactionRequest implement } } } - } +} Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java?rev=1448497&r1=1448496&r2=1448497&view=diff ============================================================================== --- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java (original) +++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java Thu Feb 21 03:06:32 2013 @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver.compactions; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -41,15 +42,28 @@ public abstract class Compactor { /** * Do a minor/major compaction on an explicit set of storefiles from a Store. - * * @param filesToCompact which files to compact - * @param majorCompaction true to major compact (prune all deletes, max versions, etc) - * @return Product of compaction or an empty list if all cells expired or deleted and - * nothing made it through the compaction. + * @param request the requested compaction + * @return Product of compaction or an empty list if all cells expired or deleted and nothing made + * it through the compaction. * @throws IOException */ - public abstract List compact(final Collection filesToCompact, - final boolean majorCompaction) throws IOException; + public abstract List compact(final CompactionRequest request) throws IOException; + + /** + * Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to + * {@link #compact(CompactionRequest)}; + * @param filesToCompact the files to compact. These are used as the compactionSelection for the + * generated {@link CompactionRequest}. + * @param isMajor true to major compact (prune all deletes, max versions, etc) + * @return Product of compaction or an empty list if all cells expired or deleted and nothing made + * it through the compaction. + * @throws IOException + */ + public List compactForTesting(final Collection filesToCompact, boolean isMajor) + throws IOException { + return compact(CompactionRequest.getRequestForTesting(filesToCompact, isMajor)); + } public CompactionProgress getProgress() { return this.progress; Modified: hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java?rev=1448497&r1=1448496&r2=1448497&view=diff ============================================================================== --- hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java (original) +++ hbase/branches/hbase-7290v2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java Thu Feb 21 03:06:32 2013 @@ -59,16 +59,12 @@ public class DefaultCompactor extends Co /** * Do a minor/major compaction on an explicit set of storefiles from a Store. - * - * @param filesToCompact which files to compact - * @param majorCompaction true to major compact (prune all deletes, max versions, etc) - * @return Product of compaction or an empty list if all cells expired or deleted and - * nothing made it through the compaction. - * @throws IOException */ @SuppressWarnings("deprecation") - public List compact(final Collection filesToCompact, - final boolean majorCompaction) throws IOException { + @Override + public List compact(final CompactionRequest request) throws IOException { + final Collection filesToCompact = request.getFiles(); + boolean majorCompaction = request.isMajor(); // Max-sequenceID is the last key in the files we're compacting long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact, true); @@ -139,7 +135,8 @@ public class DefaultCompactor extends Co scanner = store .getCoprocessorHost() .preCompactScannerOpen(store, scanners, - majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, earliestPutTs); + majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, earliestPutTs, + request); } ScanType scanType = majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT; if (scanner == null) { @@ -150,11 +147,11 @@ public class DefaultCompactor extends Co scanType, smallestReadPoint, earliestPutTs); } if (store.getCoprocessorHost() != null) { - InternalScanner cpScanner = - store.getCoprocessorHost().preCompact(store, scanner, scanType); + InternalScanner cpScanner = store.getCoprocessorHost().preCompact(store, scanner, + scanType, request); // NULL scanner returned from coprocessor hooks means skip normal processing if (cpScanner == null) { - return newFiles; // an empty list + return newFiles; // an empty list } scanner = cpScanner; } Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1448497&r1=1448496&r2=1448497&view=diff ============================================================================== --- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original) +++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java Thu Feb 21 03:06:32 2013 @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.CountDownLatch; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -47,13 +48,17 @@ import org.apache.hadoop.hbase.client.Pu import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; -import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; +import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; import org.apache.hadoop.hbase.io.hfile.HFileScanner; -import org.apache.hadoop.hbase.regionserver.compactions.*; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.compactions.Compactor; +import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactionPolicy; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -591,7 +596,7 @@ public class TestCompaction extends HBas Collection storeFiles = store.getStorefiles(); Compactor tool = store.compactor; - List newFiles = tool.compact(storeFiles, false); + List newFiles = tool.compactForTesting(storeFiles, false); // Now lets corrupt the compacted file. FileSystem fs = FileSystem.get(conf); @@ -630,7 +635,7 @@ public class TestCompaction extends HBas } store.triggerMajorCompaction(); - CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY); + CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY, null); assertNotNull("Expected to receive a compaction request", request); assertEquals( "System-requested major compaction should not occur if there are too many store files", @@ -648,7 +653,7 @@ public class TestCompaction extends HBas createStoreFile(r); } store.triggerMajorCompaction(); - CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER); + CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER, null); assertNotNull("Expected to receive a compaction request", request); assertEquals( "User-requested major compaction should always occur, even if there are too many store files", @@ -656,5 +661,53 @@ public class TestCompaction extends HBas request.isMajor()); } -} + /** + * Create a custom compaction request and be sure that we can track it through the queue, knowing + * when the compaction is completed. + */ + public void testTrackingCompactionRequest() throws Exception { + // setup a compact/split thread on a mock server + HRegionServer mockServer = Mockito.mock(HRegionServer.class); + Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); + CompactSplitThread thread = new CompactSplitThread(mockServer); + Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); + + // setup a region/store with some files + Store store = r.getStore(COLUMN_FAMILY); + createStoreFile(r); + for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) { + createStoreFile(r); + } + + CountDownLatch latch = new CountDownLatch(1); + TrackableCompactionRequest request = new TrackableCompactionRequest(r, (HStore) store, latch); + thread.requestCompaction(r, store, "test custom comapction", Store.PRIORITY_USER, request); + // wait for the latch to complete. + latch.await(); + thread.interruptIfNecessary(); + } + + /** + * Simple {@link CompactionRequest} on which you can wait until the requested compaction finishes. + */ + public static class TrackableCompactionRequest extends CompactionRequest { + private CountDownLatch done; + + /** + * Constructor for a custom compaction. Uses the setXXX methods to update the state of the + * compaction before being used. + */ + public TrackableCompactionRequest(HRegion region, HStore store, CountDownLatch finished) { + super(region, store, Store.PRIORITY_USER); + this.done = finished; + } + + @Override + public void run() { + super.run(); + this.done.countDown(); + } + } + +} Modified: hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java?rev=1448497&r1=1448496&r2=1448497&view=diff ============================================================================== --- hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (original) +++ hbase/branches/hbase-7290v2/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java Thu Feb 21 03:06:32 2013 @@ -847,6 +847,36 @@ public class TestMemStore extends TestCa } /** + * Add keyvalues with a fixed memstoreTs, and checks that memstore size is decreased + * as older keyvalues are deleted from the memstore. + * @throws Exception + */ + public void testUpsertMemstoreSize() throws Exception { + Configuration conf = HBaseConfiguration.create(); + memstore = new MemStore(conf, KeyValue.COMPARATOR); + long oldSize = memstore.size.get(); + + List l = new ArrayList(); + KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v"); + KeyValue kv2 = KeyValueTestUtil.create("r", "f", "q", 101, "v"); + KeyValue kv3 = KeyValueTestUtil.create("r", "f", "q", 102, "v"); + + kv1.setMvccVersion(1); kv2.setMvccVersion(1);kv3.setMvccVersion(1); + l.add(kv1); l.add(kv2); l.add(kv3); + + this.memstore.upsert(l, 2);// readpoint is 2 + long newSize = this.memstore.size.get(); + assert(newSize > oldSize); + + KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v"); + kv4.setMvccVersion(1); + l.clear(); l.add(kv4); + this.memstore.upsert(l, 3); + assertEquals(newSize, this.memstore.size.get()); + //this.memstore = null; + } + + /** * Adds {@link #ROW_COUNT} rows and {@link #QUALIFIER_COUNT} * @param hmc Instance to add rows to. * @return How many rows we added.