Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3B50CEF08 for ; Tue, 26 Feb 2013 20:52:34 +0000 (UTC) Received: (qmail 67643 invoked by uid 500); 26 Feb 2013 20:52:34 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 67616 invoked by uid 500); 26 Feb 2013 20:52:34 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 67608 invoked by uid 99); 26 Feb 2013 20:52:34 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Feb 2013 20:52:33 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Feb 2013 20:52:23 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 7165B23888EA; Tue, 26 Feb 2013 20:52:01 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1450407 [1/2] - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/coprocessor/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/compactions/ test/java/org/apache/hadoop/hbase/... Date: Tue, 26 Feb 2013 20:52:00 -0000 To: commits@hbase.apache.org From: sershe@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130226205201.7165B23888EA@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: sershe Date: Tue Feb 26 20:51:59 2013 New Revision: 1450407 URL: http://svn.apache.org/r1450407 Log: HBASE-7843 Enable encapsulating compaction policy/compactor/store file manager interaction shenanigans Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java Removed: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/PerfTestCompactionPolicies.java Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java?rev=1450407&r1=1450406&r2=1450407&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java Tue Feb 26 20:51:59 2013 @@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.regionser import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; -import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=1450407&r1=1450406&r2=1450407&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java Tue Feb 26 20:51:59 2013 @@ -26,6 +26,7 @@ import java.util.concurrent.BlockingQueu import java.util.concurrent.Executors; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -34,7 +35,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.util.StringUtils; import com.google.common.base.Preconditions; @@ -89,8 +95,7 @@ public class CompactSplitThread implemen return t; } }); - this.largeCompactions - .setRejectedExecutionHandler(new CompactionRequest.Rejection()); + this.largeCompactions.setRejectedExecutionHandler(new Rejection()); this.smallCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS, new PriorityBlockingQueue(), new ThreadFactory() { @@ -102,7 +107,7 @@ public class CompactSplitThread implemen } }); this.smallCompactions - .setRejectedExecutionHandler(new CompactionRequest.Rejection()); + .setRejectedExecutionHandler(new Rejection()); this.splits = (ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads, new ThreadFactory() { @@ -193,7 +198,7 @@ public class CompactSplitThread implemen @Override public synchronized List requestCompaction(final HRegion r, final String why, - List requests) throws IOException { + List> requests) throws IOException { return requestCompaction(r, why, Store.NO_PRIORITY, requests); } @@ -205,7 +210,7 @@ public class CompactSplitThread implemen @Override public synchronized List requestCompaction(final HRegion r, final String why, - int p, List requests) throws IOException { + int p, List> requests) throws IOException { // not a special compaction request, so make our own list List ret; if (requests == null) { @@ -215,8 +220,8 @@ public class CompactSplitThread implemen } } else { ret = new ArrayList(requests.size()); - for (CompactionRequest request : requests) { - requests.add(requestCompaction(r, request.getStore(), why, p, request)); + for (Pair pair : requests) { + ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst())); } } return ret; @@ -235,28 +240,29 @@ public class CompactSplitThread implemen if (this.server.isStopped()) { return null; } - CompactionRequest cr = s.requestCompaction(priority, request); - if (cr != null) { - cr.setServer(server); - if (priority != Store.NO_PRIORITY) { - cr.setPriority(priority); - } - ThreadPoolExecutor pool = s.throttleCompaction(cr.getSize()) - ? largeCompactions : smallCompactions; - pool.execute(cr); - if (LOG.isDebugEnabled()) { - String type = (pool == smallCompactions) ? "Small " : "Large "; - LOG.debug(type + "Compaction requested: " + cr - + (why != null && !why.isEmpty() ? "; Because: " + why : "") - + "; " + this); - } - } else { + CompactionContext compaction = s.requestCompaction(priority, request); + if (compaction == null) { if(LOG.isDebugEnabled()) { LOG.debug("Not compacting " + r.getRegionNameAsString() + " because compaction request was cancelled"); } + return null; + } + + assert compaction.hasSelection(); + if (priority != Store.NO_PRIORITY) { + compaction.getRequest().setPriority(priority); } - return cr; + ThreadPoolExecutor pool = s.throttleCompaction(compaction.getRequest().getSize()) + ? largeCompactions : smallCompactions; + pool.execute(new CompactionRunner(s, r, compaction)); + if (LOG.isDebugEnabled()) { + String type = (pool == smallCompactions) ? "Small " : "Large "; + LOG.debug(type + "Compaction requested: " + compaction + + (why != null && !why.isEmpty() ? "; Because: " + why : "") + + "; " + this); + } + return compaction.getRequest(); } /** @@ -309,4 +315,73 @@ public class CompactSplitThread implemen public int getRegionSplitLimit() { return this.regionSplitLimit; } + + private class CompactionRunner implements Runnable, Comparable { + private final Store store; + private final HRegion region; + private final CompactionContext compaction; + + public CompactionRunner(Store store, HRegion region, CompactionContext compaction) { + super(); + this.store = store; + this.region = region; + this.compaction = compaction; + } + + @Override + public void run() { + Preconditions.checkNotNull(server); + if (server.isStopped()) { + return; + } + this.compaction.getRequest().beforeExecute(); + try { + // Note: please don't put single-compaction logic here; + // put it into region/store/etc. This is CST logic. + long start = EnvironmentEdgeManager.currentTimeMillis(); + boolean completed = region.compact(compaction, store); + long now = EnvironmentEdgeManager.currentTimeMillis(); + LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " + + this + "; duration=" + StringUtils.formatTimeDiff(now, start)); + if (completed) { + // degenerate case: blocked regions require recursive enqueues + if (store.getCompactPriority() <= 0) { + requestCompaction(region, store, "Recursive enqueue", null); + } else { + // see if the compaction has caused us to exceed max region size + requestSplit(region); + } + } + } catch (IOException ex) { + LOG.error("Compaction failed " + this, RemoteExceptionHandler.checkIOException(ex)); + server.checkFileSystem(); + } catch (Exception ex) { + LOG.error("Compaction failed " + this, ex); + server.checkFileSystem(); + } finally { + LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this); + } + this.compaction.getRequest().afterExecute(); + } + + @Override + public int compareTo(CompactionRunner o) { + // Only compare the underlying request, for queue sorting purposes. + return this.compaction.getRequest().compareTo(o.compaction.getRequest()); + } + } + + /** + * Cleanup class to use when rejecting a compaction request from the queue. + */ + private static class Rejection implements RejectedExecutionHandler { + @Override + public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) { + if (runnable instanceof CompactionRunner) { + CompactionRunner runner = (CompactionRunner)runnable; + LOG.debug("Compaction Rejected: " + runner); + runner.store.cancelRequestedCompaction(runner.compaction); + } + } + } } Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java?rev=1450407&r1=1450406&r2=1450407&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java Tue Feb 26 20:51:59 2013 @@ -23,6 +23,7 @@ import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.util.Pair; @InterfaceAudience.Private public interface CompactionRequestor { @@ -47,7 +48,7 @@ public interface CompactionRequestor { * @throws IOException */ public List requestCompaction(final HRegion r, final String why, - List requests) + List> requests) throws IOException; /** @@ -74,7 +75,7 @@ public interface CompactionRequestor { * @throws IOException */ public List requestCompaction(final HRegion r, final String why, int pri, - List requests) throws IOException; + List> requests) throws IOException; /** * @param r Region to compact Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java?rev=1450407&r1=1450406&r2=1450407&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java Tue Feb 26 20:51:59 2013 @@ -56,7 +56,7 @@ import org.apache.hadoop.hbase.HDFSBlock import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.mapreduce.JobUtil; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.util.Bytes; @@ -156,8 +156,9 @@ public class CompactionTool extends Conf " family=" + familyDir.getName()); HStore store = getStore(region, familyDir); do { - CompactionRequest cr = store.requestCompaction(); - List storeFiles = store.compact(cr); + CompactionContext compaction = store.requestCompaction(); + if (compaction == null) break; + List storeFiles = store.compact(compaction); if (storeFiles != null && !storeFiles.isEmpty()) { if (keepCompactedFiles && deleteCompacted) { for (StoreFile storeFile: storeFiles) { @@ -465,4 +466,4 @@ public class CompactionTool extends Conf public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(HBaseConfiguration.create(), new CompactionTool(), args)); } -} \ No newline at end of file +} Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java?rev=1450407&r1=1450406&r2=1450407&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java Tue Feb 26 20:51:59 2013 @@ -18,28 +18,64 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.Compactor; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; /** - * Default StoreEngine creates the default compactor, policy, and store file manager. + * Default StoreEngine creates the default compactor, policy, and store file manager, or + * their derivatives. */ @InterfaceAudience.Private -public class DefaultStoreEngine extends StoreEngine { +public class DefaultStoreEngine extends StoreEngine< + DefaultCompactionPolicy, DefaultCompactor, DefaultStoreFileManager> { + public DefaultStoreEngine(Configuration conf, Store store, KVComparator comparator) { super(conf, store, comparator); } @Override - protected void createComponents(PP storeFileManager, - PP compactionPolicy, PP compactor) { - storeFileManager.set(new DefaultStoreFileManager(this.comparator)); - compactionPolicy.set(new DefaultCompactionPolicy(this.conf, this.store)); - compactor.set(new DefaultCompactor(this.conf, this.store)); + protected void createComponents() { + storeFileManager = new DefaultStoreFileManager(this.comparator); + + // TODO: compactor and policy may be separately pluggable, but must derive from default ones. + compactor = new DefaultCompactor(this.conf, this.store); + compactionPolicy = new DefaultCompactionPolicy(this.conf, this.store/*as StoreConfigInfo*/); + } + + @Override + protected CompactionContext createCompactionContext() { + return new DefaultCompactionContext(); + } + + private class DefaultCompactionContext extends CompactionContext { + @Override + public boolean select(List filesCompacting, boolean isUserCompaction, + boolean mayUseOffPeak, boolean forceMajor) throws IOException { + request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(), + filesCompacting, isUserCompaction, mayUseOffPeak, forceMajor); + return request != null; + } + + @Override + public List compact() throws IOException { + return compactor.compact(request); + } + + @Override + public List preSelect(List filesCompacting) { + return compactionPolicy.preSelectCompactionForCoprocessor( + storeFileManager.getStorefiles(), filesCompacting); + } } } Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1450407&r1=1450406&r2=1450407&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Feb 26 20:51:59 2013 @@ -119,6 +119,7 @@ import org.apache.hadoop.hbase.protobuf. import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; @@ -1291,13 +1292,9 @@ public class HRegion implements HeapSize */ public void compactStores() throws IOException { for (Store s : getStores().values()) { - CompactionRequest cr = s.requestCompaction(); - if(cr != null) { - try { - compact(cr); - } finally { - s.finishRequest(cr); - } + CompactionContext compaction = s.requestCompaction(); + if (compaction != null) { + compact(compaction, s); } } } @@ -1317,45 +1314,46 @@ public class HRegion implements HeapSize * @return whether the compaction completed * @throws IOException e */ - public boolean compact(CompactionRequest cr) - throws IOException { - if (cr == null) { - return false; - } + public boolean compact(CompactionContext compaction, Store store) throws IOException { + assert compaction != null && compaction.hasSelection(); + assert !compaction.getRequest().getFiles().isEmpty(); if (this.closing.get() || this.closed.get()) { LOG.debug("Skipping compaction on " + this + " because closing/closed"); + store.cancelRequestedCompaction(compaction); return false; } - Preconditions.checkArgument(cr.getHRegion().equals(this)); MonitoredTask status = null; + boolean didPerformCompaction = false; // block waiting for the lock for compaction lock.readLock().lock(); try { - status = TaskMonitor.get().createStatus( - "Compacting " + cr.getStore() + " in " + this); + status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this); if (this.closed.get()) { - LOG.debug("Skipping compaction on " + this + " because closed"); + String msg = "Skipping compaction on " + this + " because closed"; + LOG.debug(msg); + status.abort(msg); return false; } - boolean decr = true; + boolean wasStateSet = false; try { synchronized (writestate) { if (writestate.writesEnabled) { + wasStateSet = true; ++writestate.compacting; } else { String msg = "NOT compacting region " + this + ". Writes disabled."; LOG.info(msg); status.abort(msg); - decr = false; return false; } } - LOG.info("Starting compaction on " + cr.getStore() + " in region " - + this + (cr.getCompactSelection().isOffPeakCompaction()?" as an off-peak compaction":"")); + LOG.info("Starting compaction on " + store + " in region " + this + + (compaction.getRequest().isOffPeak()?" as an off-peak compaction":"")); doRegionCompactionPrep(); try { - status.setStatus("Compacting store " + cr.getStore()); - cr.getStore().compact(cr); + status.setStatus("Compacting store " + store); + didPerformCompaction = true; + store.compact(compaction); } catch (InterruptedIOException iioe) { String msg = "compaction interrupted"; LOG.info(msg, iioe); @@ -1363,7 +1361,7 @@ public class HRegion implements HeapSize return false; } } finally { - if (decr) { + if (wasStateSet) { synchronized (writestate) { --writestate.compacting; if (writestate.compacting <= 0) { @@ -1376,6 +1374,7 @@ public class HRegion implements HeapSize return true; } finally { try { + if (!didPerformCompaction) store.cancelRequestedCompaction(compaction); if (status != null) status.cleanup(); } finally { lock.readLock().unlock(); Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1450407&r1=1450406&r2=1450407&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Tue Feb 26 20:51:59 2013 @@ -66,7 +66,7 @@ import org.apache.hadoop.hbase.io.hfile. import org.apache.hadoop.hbase.exceptions.InvalidHFileException; import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder; import org.apache.hadoop.hbase.monitoring.MonitoredTask; -import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; @@ -154,8 +154,8 @@ public class HStore implements Store { // Comparing KeyValues private final KeyValue.KVComparator comparator; - final Compactor compactor; - + final StoreEngine storeEngine; + private OffPeakCompactions offPeakCompactions; private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10; @@ -223,8 +223,11 @@ public class HStore implements Store { "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */); } - StoreEngine engine = StoreEngine.create(this, this.conf, this.comparator); - this.storeFileManager = engine.getStoreFileManager(); + storeEngine = StoreEngine.create(this, this.conf, this.comparator); + // Copy some things to local fields for convenience. + this.storeFileManager = storeEngine.getStoreFileManager(); + this.compactionPolicy = storeEngine.getCompactionPolicy(); + this.storeFileManager.loadFiles(loadStoreFiles()); // Initialize checksum type from name. The names are CRC32, CRC32C, etc. @@ -243,9 +246,6 @@ public class HStore implements Store { + HStore.flush_retries_number); } } - this.compactionPolicy = engine.getCompactionPolicy(); - // Get the compaction tool instance for this policy - this.compactor = engine.getCompactor(); } /** @@ -1067,15 +1067,15 @@ public class HStore implements Store { *

We don't want to hold the structureLock for the whole time, as a compact() * can be lengthy and we want to allow cache-flushes during this period. * - * @param cr - * compaction details obtained from requestCompaction() + * @param compaction compaction details obtained from requestCompaction() * @throws IOException * @return Storefile we compacted into or null if we failed or opted out early. */ - List compact(CompactionRequest cr) throws IOException { - if (cr == null || cr.getFiles().isEmpty()) return null; - Preconditions.checkArgument(cr.getStore().toString().equals(this.toString())); - List filesToCompact = cr.getFiles(); + public List compact(CompactionContext compaction) throws IOException { + assert compaction != null && compaction.hasSelection(); + CompactionRequest cr = compaction.getRequest(); + Collection filesToCompact = cr.getFiles(); + assert !filesToCompact.isEmpty(); synchronized (filesCompacting) { // sanity check: we're compacting files that this store knows about // TODO: change this to LOG.error() after more debugging @@ -1091,16 +1091,20 @@ public class HStore implements Store { List sfs = new ArrayList(); long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis(); try { - List newFiles = this.compactor.compact(cr); + // Commence the compaction. + List newFiles = compaction.compact(); // Move the compaction into place. if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) { for (Path newFile: newFiles) { - StoreFile sf = completeCompaction(filesToCompact, newFile); + assert newFile != null; + StoreFile sf = moveFileIntoPlace(newFile); if (region.getCoprocessorHost() != null) { region.getCoprocessorHost().postCompact(this, sf, cr); } + assert sf != null; sfs.add(sf); } + completeCompaction(filesToCompact, sfs); } else { for (Path newFile: newFiles) { // Create storefile around what we wrote with a reader on it. @@ -1111,15 +1115,24 @@ public class HStore implements Store { } } } finally { - synchronized (filesCompacting) { - filesCompacting.removeAll(filesToCompact); - } + finishCompactionRequest(cr); } + logCompactionEndMessage(cr, sfs, compactionStartTime); + return sfs; + } + /** + * Log a very elaborate compaction completion message. + * @param cr Request. + * @param sfs Resulting files. + * @param compactionStartTime Start time. + */ + private void logCompactionEndMessage( + CompactionRequest cr, List sfs, long compactionStartTime) { long now = EnvironmentEdgeManager.currentTimeMillis(); StringBuilder message = new StringBuilder( "Completed" + (cr.isMajor() ? " major " : " ") + "compaction of " - + filesToCompact.size() + " file(s) in " + this + " of " + + cr.getFiles().size() + " file(s) in " + this + " of " + this.region.getRegionInfo().getRegionNameAsString() + " into "); if (sfs.isEmpty()) { @@ -1139,7 +1152,23 @@ public class HStore implements Store { .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime)) .append(" to execute."); LOG.info(message.toString()); - return sfs; + } + + // Package-visible for tests + StoreFile moveFileIntoPlace(Path newFile) throws IOException { + validateStoreFile(newFile); + // Move the file into the right spot + Path destPath = new Path(homedir, newFile.getName()); + LOG.info("Renaming compacted file at " + newFile + " to " + destPath); + if (!fs.rename(newFile, destPath)) { + String err = "Failed move of compacted file " + newFile + " to " + destPath; + LOG.error(err); + throw new IOException(err); + } + StoreFile result = new StoreFile(this.fs, destPath, this.conf, this.cacheConf, + this.family.getBloomFilterType(), this.dataBlockEncoder); + result.createReader(); + return result; } /** @@ -1181,13 +1210,17 @@ public class HStore implements Store { try { // Ready to go. Have list of files to compact. - List newFiles = this.compactor.compactForTesting(filesToCompact, isMajor); + List newFiles = + this.storeEngine.getCompactor().compactForTesting(filesToCompact, isMajor); for (Path newFile: newFiles) { // Move the compaction into place. - StoreFile sf = completeCompaction(filesToCompact, newFile); + StoreFile sf = moveFileIntoPlace(newFile); if (region.getCoprocessorHost() != null) { region.getCoprocessorHost().postCompact(this, sf, null); } + ArrayList tmp = new ArrayList(); + tmp.add(sf); + completeCompaction(filesToCompact, tmp); } } finally { synchronized (filesCompacting) { @@ -1203,7 +1236,7 @@ public class HStore implements Store { @Override public CompactionProgress getCompactionProgress() { - return this.compactor.getProgress(); + return this.storeEngine.getCompactor().getProgress(); } @Override @@ -1219,100 +1252,102 @@ public class HStore implements Store { } @Override - public CompactionRequest requestCompaction() throws IOException { + public CompactionContext requestCompaction() throws IOException { return requestCompaction(Store.NO_PRIORITY, null); } @Override - public CompactionRequest requestCompaction(int priority, CompactionRequest request) + public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest) throws IOException { // don't even select for compaction if writes are disabled if (!this.region.areWritesEnabled()) { return null; } + CompactionContext compaction = storeEngine.createCompaction(); this.lock.readLock().lock(); try { - List candidates = Lists.newArrayList(storeFileManager.getStorefiles()); synchronized (filesCompacting) { - // First we need to pre-select compaction, and then pre-compact selection! - candidates = compactionPolicy.preSelectCompaction(candidates, filesCompacting); - boolean override = false; + // First, see if coprocessor would want to override selection. if (region.getCoprocessorHost() != null) { - override = region.getCoprocessorHost().preCompactSelection(this, candidates, request); + List candidatesForCoproc = compaction.preSelect(this.filesCompacting); + boolean override = region.getCoprocessorHost().preCompactSelection( + this, candidatesForCoproc, baseRequest); + if (override) { + // Coprocessor is overriding normal file selection. + compaction.forceSelect(new CompactionRequest(candidatesForCoproc)); + } } - CompactSelection filesToCompact; - if (override) { - // coprocessor is overriding normal file selection - filesToCompact = new CompactSelection(candidates); - } else { + + // Normal case - coprocessor is not overriding file selection. + if (!compaction.hasSelection()) { boolean isUserCompaction = priority == Store.PRIORITY_USER; boolean mayUseOffPeak = this.offPeakCompactions.tryStartOffPeakRequest(); - filesToCompact = compactionPolicy.selectCompaction(candidates, isUserCompaction, + compaction.select(this.filesCompacting, isUserCompaction, mayUseOffPeak, forceMajor && filesCompacting.isEmpty()); - if (mayUseOffPeak && !filesToCompact.isOffPeakCompaction()) { - // Compaction policy doesn't want to do anything with off-peak. + assert compaction.hasSelection(); + if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) { + // Compaction policy doesn't want to take advantage of off-peak. this.offPeakCompactions.endOffPeakRequest(); } } - if (region.getCoprocessorHost() != null) { - region.getCoprocessorHost().postCompactSelection(this, - ImmutableList.copyOf(filesToCompact.getFilesToCompact()), request); + region.getCoprocessorHost().postCompactSelection( + this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest); + } + + // Selected files; see if we have a compaction with some custom base request. + if (baseRequest != null) { + // Update the request with what the system thinks the request should be; + // its up to the request if it wants to listen. + compaction.forceSelect( + baseRequest.combineWith(compaction.getRequest())); } - // no files to compact - if (filesToCompact.getFilesToCompact().isEmpty()) { + // Finally, we have the resulting files list. Check if we have any files at all. + final Collection selectedFiles = compaction.getRequest().getFiles(); + if (selectedFiles.isEmpty()) { return null; } - // basic sanity check: do not try to compact the same StoreFile twice. - if (!Collections.disjoint(filesCompacting, filesToCompact.getFilesToCompact())) { + // Update filesCompacting (check that we do not try to compact the same StoreFile twice). + if (!Collections.disjoint(filesCompacting, selectedFiles)) { // TODO: change this from an IAE to LOG.error after sufficient testing Preconditions.checkArgument(false, "%s overlaps with %s", - filesToCompact, filesCompacting); + selectedFiles, filesCompacting); } - filesCompacting.addAll(filesToCompact.getFilesToCompact()); + filesCompacting.addAll(selectedFiles); Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID); - boolean isMajor = - (filesToCompact.getFilesToCompact().size() == this.getStorefilesCount()); - if (isMajor) { - // since we're enqueuing a major, update the compaction wait interval - this.forceMajor = false; - } - - LOG.debug(getRegionInfo().getEncodedName() + " - " + - getColumnFamilyName() + ": Initiating " + - (isMajor ? "major" : "minor") + " compaction"); - - // everything went better than expected. create a compaction request - int pri = getCompactPriority(priority); - //not a special compaction request, so we need to make one - if(request == null){ - request = new CompactionRequest(region, this, filesToCompact, isMajor, pri); - }else{ - //update the request with what the system thinks the request should be - //its up to the request if it wants to listen - request.setSelection(filesToCompact); - request.setIsMajor(isMajor); - request.setPriority(pri); - } + // If we're enqueuing a major, clear the force flag. + boolean isMajor = selectedFiles.size() == this.getStorefilesCount(); + this.forceMajor = this.forceMajor && !isMajor; + + // Set common request properties. + compaction.getRequest().setPriority(getCompactPriority(priority)); + compaction.getRequest().setIsMajor(isMajor); + compaction.getRequest().setDescription( + region.getRegionNameAsString(), getColumnFamilyName()); } } finally { this.lock.readLock().unlock(); } - if (request != null) { - this.region.reportCompactionRequestStart(request.isMajor()); - } - return request; + + LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName() + ": Initiating " + + (compaction.getRequest().isMajor() ? "major" : "minor") + " compaction"); + this.region.reportCompactionRequestStart(compaction.getRequest().isMajor()); + return compaction; + } + + public void cancelRequestedCompaction(CompactionContext compaction) { + finishCompactionRequest(compaction.getRequest()); } - public void finishRequest(CompactionRequest cr) { + private void finishCompactionRequest(CompactionRequest cr) { this.region.reportCompactionRequestEnd(cr.isMajor()); - if (cr.getCompactSelection().isOffPeakCompaction()) { + if (cr.isOffPeak()) { this.offPeakCompactions.endOffPeakRequest(); - cr.getCompactSelection().setOffPeak(false); + cr.setOffPeak(false); } synchronized (filesCompacting) { filesCompacting.removeAll(cr.getFiles()); @@ -1363,28 +1398,8 @@ public class HStore implements Store { * @return StoreFile created. May be null. * @throws IOException */ - StoreFile completeCompaction(final Collection compactedFiles, - final Path newFile) - throws IOException { - // 1. Moving the new files into place -- if there is a new file (may not - // be if all cells were expired or deleted). - StoreFile result = null; - if (newFile != null) { - validateStoreFile(newFile); - // Move the file into the right spot - Path destPath = new Path(homedir, newFile.getName()); - LOG.info("Renaming compacted file at " + newFile + " to " + destPath); - if (!fs.rename(newFile, destPath)) { - LOG.error("Failed move of compacted file " + newFile + " to " + - destPath); - throw new IOException("Failed move of compacted file " + newFile + - " to " + destPath); - } - result = new StoreFile(this.fs, destPath, this.conf, this.cacheConf, - this.family.getBloomFilterType(), this.dataBlockEncoder); - result.createReader(); - } - + private void completeCompaction(final Collection compactedFiles, + final Collection result) throws IOException { try { this.lock.writeLock().lock(); try { @@ -1392,11 +1407,7 @@ public class HStore implements Store { // delete old store files until we have sent out notification of // change in case old files are still being accessed by outstanding // scanners. - List results = new ArrayList(1); - if (result != null) { - results.add(result); - } - this.storeFileManager.addCompactionResults(compactedFiles, results); + this.storeFileManager.addCompactionResults(compactedFiles, result); filesCompacting.removeAll(compactedFiles); // safe bc: lock.writeLock() } finally { // We need the lock, as long as we are updating the storeFiles @@ -1418,8 +1429,8 @@ public class HStore implements Store { } catch (IOException e) { e = RemoteExceptionHandler.checkIOException(e); LOG.error("Failed replacing compacted files in " + this + - ". Compacted file is " + (result == null? "none": result.toString()) + - ". Files replaced " + compactedFiles.toString() + + ". Compacted files are " + (result == null? "none": result.toString()) + + ". Files replaced " + compactedFiles.toString() + " some of which may have been already removed", e); } @@ -1435,7 +1446,6 @@ public class HStore implements Store { this.storeSize += r.length(); this.totalUncompressedBytes += r.getTotalUncompressedBytes(); } - return result; } /* Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java?rev=1450407&r1=1450406&r2=1450407&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java Tue Feb 26 20:51:59 2013 @@ -55,7 +55,6 @@ import org.apache.hadoop.hbase.coprocess import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1450407&r1=1450406&r2=1450407&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Feb 26 20:51:59 2013 @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.io.HeapSi import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; @@ -157,12 +158,14 @@ public interface Store extends HeapSize, */ public CompactionProgress getCompactionProgress(); - public CompactionRequest requestCompaction() throws IOException; + public CompactionContext requestCompaction() throws IOException; - public CompactionRequest requestCompaction(int priority, CompactionRequest request) + public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest) throws IOException; - public void finishRequest(CompactionRequest cr); + public void cancelRequestedCompaction(CompactionContext compaction); + + public List compact(CompactionContext compaction) throws IOException; /** * @return true if we should run a major compaction. Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java?rev=1450407&r1=1450406&r2=1450407&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java Tue Feb 26 20:51:59 2013 @@ -24,8 +24,11 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.Compactor; +import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactionPolicy; +import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.util.ReflectionUtils; /** @@ -34,14 +37,15 @@ import org.apache.hadoop.hbase.util.Refl * they are tied together and replaced together via StoreEngine-s. */ @InterfaceAudience.Private -public abstract class StoreEngine { +public abstract class StoreEngine< + CP extends CompactionPolicy, C extends Compactor, SFM extends StoreFileManager> { protected final Store store; protected final Configuration conf; protected final KVComparator comparator; - private final PP compactionPolicy = new PP(); - private final PP compactor = new PP(); - private final PP storeFileManager = new PP(); + protected CP compactionPolicy; + protected C compactor; + protected SFM storeFileManager; private boolean isInitialized = false; /** @@ -50,7 +54,7 @@ public abstract class StoreEngine { */ public static final String STORE_ENGINE_CLASS_KEY = "hbase.hstore.engine.class"; - private static final Class + private static final Class> DEFAULT_STORE_ENGINE_CLASS = DefaultStoreEngine.class; /** @@ -58,7 +62,7 @@ public abstract class StoreEngine { */ public CompactionPolicy getCompactionPolicy() { createComponentsOnce(); - return this.compactionPolicy.get(); + return this.compactionPolicy; } /** @@ -66,7 +70,7 @@ public abstract class StoreEngine { */ public Compactor getCompactor() { createComponentsOnce(); - return this.compactor.get(); + return this.compactor; } /** @@ -74,7 +78,7 @@ public abstract class StoreEngine { */ public StoreFileManager getStoreFileManager() { createComponentsOnce(); - return this.storeFileManager.get(); + return this.storeFileManager; } protected StoreEngine(Configuration conf, Store store, KVComparator comparator) { @@ -83,18 +87,22 @@ public abstract class StoreEngine { this.comparator = comparator; } + public CompactionContext createCompaction() { + createComponentsOnce(); + return this.createCompactionContext(); + } + + protected abstract CompactionContext createCompactionContext(); + /** * Create the StoreEngine's components. - * @param storeFileManager out parameter for StoreFileManager. - * @param compactionPolicy out parameter for CompactionPolicy. - * @param compactor out parameter for Compactor. */ - protected abstract void createComponents(PP storeFileManager, - PP compactionPolicy, PP compactor); + protected abstract void createComponents(); private void createComponentsOnce() { if (isInitialized) return; - createComponents(storeFileManager, compactionPolicy, compactor); + createComponents(); + assert compactor != null && compactionPolicy != null && storeFileManager != null; isInitialized = true; } @@ -117,18 +125,4 @@ public abstract class StoreEngine { throw new IOException("Unable to load configured store engine '" + className + "'", e); } } - - /** - * To allow StoreEngine-s to have custom dependencies between 3 components, we want to create - * them in one place. To return multiple, simulate C++ pointer to pointers/C# out params. - */ - protected static class PP { - private T t = null; - public void set(T t) { - this.t = t; - } - public T get() { - return this.t; - } - } } Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java?rev=1450407&r1=1450406&r2=1450407&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java Tue Feb 26 20:51:59 2013 @@ -39,7 +39,7 @@ import com.google.common.collect.Immutab * Implementations are assumed to be not thread safe. */ @InterfaceAudience.Private -interface StoreFileManager { +public interface StoreFileManager { /** * Loads the initial store files into empty StoreFileManager. * @param storeFiles The files to load. @@ -56,7 +56,6 @@ interface StoreFileManager { * Adds compaction results into the structure. * @param compactedFiles The input files for the compaction. * @param results The resulting files for the compaction. - * @return The files that can be removed from storage. Usually, */ public abstract void addCompactionResults( Collection compactedFiles, Collection results); Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java?rev=1450407&view=auto ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java (added) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java Tue Feb 26 20:51:59 2013 @@ -0,0 +1,80 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.regionserver.StoreFile; + + +/** + * This class holds all "physical" details necessary to run a compaction. + * It also has compaction request with all the logical details. + * Hence, this class is basically the compaction. + */ +@InterfaceAudience.Private +public abstract class CompactionContext { + protected CompactionRequest request = null; + + /** + * Called before coprocessor preCompactSelection and should filter the candidates + * for coprocessor; i.e. exclude the files that definitely cannot be compacted at this time. + * @param filesCompacting files currently compacting + * @return the list of files that can theoretically be compacted. + */ + public abstract List preSelect(final List filesCompacting); + + /** + * Called to select files for compaction. Must fill in the request field if successful. + * @param filesCompacting Files currently being compacted by other compactions. + * @param isUserCompaction Whether this is a user compaction. + * @param mayUseOffPeak Whether the underlying policy may assume it's off-peak hours. + * @param forceMajor Whether to force major compaction. + * @return Whether the selection succeeded. Selection may be empty and lead to no compaction. + */ + public abstract boolean select( + final List filesCompacting, final boolean isUserCompaction, + final boolean mayUseOffPeak, final boolean forceMajor) throws IOException; + + /** + * Forces external selection to be applied for this compaction. + * @param request The pre-cooked request with selection and other settings. + */ + public void forceSelect(CompactionRequest request) { + this.request = request; + } + + /** + * Runs the compaction based on current selection. select/forceSelect must have been called. + * @return The new file paths resulting from compaction. + */ + public abstract List compact() throws IOException; + + public CompactionRequest getRequest() { + assert hasSelection(); + return this.request; + } + + public boolean hasSelection() { + return this.request != null; + } +} Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java?rev=1450407&r1=1450406&r2=1450407&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java Tue Feb 26 20:51:59 2013 @@ -43,25 +43,6 @@ public abstract class CompactionPolicy { } /** - * This is called before coprocessor preCompactSelection and should filter the candidates - * for coprocessor; i.e. exclude the files that definitely cannot be compacted at this time. - * @param candidateFiles candidate files, ordered from oldest to newest - * @param filesCompacting files currently compacting - * @return the list of files that can theoretically be compacted. - */ - public abstract List preSelectCompaction( - List candidateFiles, final List filesCompacting); - - /** - * @param candidateFiles candidate files, ordered from oldest to newest - * @return subset copy of candidate list that meets compaction criteria - * @throws java.io.IOException - */ - public abstract CompactSelection selectCompaction( - final List candidateFiles, final boolean isUserCompaction, - final boolean mayUseOffPeak, final boolean forceMajor) throws IOException; - - /** * @param storeFiles Store files in the store. * @return The system compaction priority of the store, based on storeFiles. * The priority range is as such - the smaller values are higher priority; Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java?rev=1450407&r1=1450406&r2=1450407&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java Tue Feb 26 20:51:59 2013 @@ -18,22 +18,13 @@ */ package org.apache.hadoop.hbase.regionserver.compactions; -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hbase.RemoteExceptionHandler; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -46,231 +37,193 @@ import com.google.common.base.Predicate; import com.google.common.collect.Collections2; /** - * This class holds all details necessary to run a compaction. + * This class holds all logical details necessary to run a compaction. */ @InterfaceAudience.LimitedPrivate({ "coprocessor" }) @InterfaceStability.Evolving -public class CompactionRequest implements Comparable, - Runnable { - static final Log LOG = LogFactory.getLog(CompactionRequest.class); - private final HRegion region; - private final HStore store; - private CompactSelection compactSelection; - private long totalSize; - private boolean isMajor; - private int priority; - private final Long timeInNanos; - private HRegionServer server = null; - - public static CompactionRequest getRequestForTesting(Collection selection, - boolean isMajor) { - return new CompactionRequest(null, null, new CompactSelection(new ArrayList( - selection)), isMajor, 0, System.nanoTime()); - } - - /** - * Constructor for a custom compaction. Uses the setXXX methods to update the state of the - * compaction before being used. - */ - public CompactionRequest(HRegion region, HStore store, int priority) { - this(region, store, null, false, priority, System - .nanoTime()); - } - - public CompactionRequest(HRegion r, HStore s, CompactSelection files, boolean isMajor, int p) { - // delegate to the internal constructor after checking basic preconditions - this(Preconditions.checkNotNull(r), s, Preconditions.checkNotNull(files), isMajor, p, System - .nanoTime()); - } - - private CompactionRequest(HRegion region, HStore store, CompactSelection files, boolean isMajor, - int priority, long startTime) { - this.region = region; - this.store = store; - this.isMajor = isMajor; - this.priority = priority; - this.timeInNanos = startTime; - if (files != null) { - this.setSelection(files); - } - } - - /** - * This function will define where in the priority queue the request will - * end up. Those with the highest priorities will be first. When the - * priorities are the same it will first compare priority then date - * to maintain a FIFO functionality. - * - *

Note: The date is only accurate to the millisecond which means it is - * possible that two requests were inserted into the queue within a - * millisecond. When that is the case this function will break the tie - * arbitrarily. - */ - @Override - public int compareTo(CompactionRequest request) { - //NOTE: The head of the priority queue is the least element - if (this.equals(request)) { - return 0; //they are the same request - } - int compareVal; - - compareVal = priority - request.priority; //compare priority - if (compareVal != 0) { - return compareVal; - } - - compareVal = timeInNanos.compareTo(request.timeInNanos); - if (compareVal != 0) { - return compareVal; - } - - // break the tie based on hash code - return this.hashCode() - request.hashCode(); - } - - @Override - public boolean equals(Object obj) { - return (this == obj); - } - - /** Gets the HRegion for the request */ - public HRegion getHRegion() { - return region; - } - - /** Gets the Store for the request */ - public HStore getStore() { - return store; - } - - /** Gets the compact selection object for the request */ - public CompactSelection getCompactSelection() { - return compactSelection; - } - - /** Gets the StoreFiles for the request */ - public List getFiles() { - return compactSelection.getFilesToCompact(); - } - - /** Gets the total size of all StoreFiles in compaction */ - public long getSize() { - return totalSize; - } - - public boolean isMajor() { - return this.isMajor; - } - - /** Gets the priority for the request */ - public int getPriority() { - return priority; - } - - public long getSelectionTime() { - return compactSelection.getSelectionTime(); - } - - /** Gets the priority for the request */ - public void setPriority(int p) { - this.priority = p; - } - - public void setServer(HRegionServer hrs) { - this.server = hrs; - } - - /** - * Set the files (and, implicitly, the size of the compaction based on those files) - * @param files files that should be included in the compaction - */ - public void setSelection(CompactSelection files) { - long sz = 0; - for (StoreFile sf : files.getFilesToCompact()) { - sz += sf.getReader().length(); - } - this.totalSize = sz; - this.compactSelection = files; - } - - /** - * Specify if this compaction should be a major compaction based on the state of the store - * @param isMajor true if the system determines that this compaction should be a major - * compaction - */ - public void setIsMajor(boolean isMajor) { - this.isMajor = isMajor; - } - - @Override - public String toString() { - String fsList = Joiner.on(", ").join( - Collections2.transform(Collections2.filter( - compactSelection.getFilesToCompact(), - new Predicate() { - public boolean apply(StoreFile sf) { - return sf.getReader() != null; - } - }), new Function() { - public String apply(StoreFile sf) { - return StringUtils.humanReadableInt(sf.getReader().length()); +public class CompactionRequest implements Comparable { + static final Log LOG = LogFactory.getLog(CompactionRequest.class); + // was this compaction promoted to an off-peak + private boolean isOffPeak = false; + private boolean isMajor = false; + private int priority = Store.NO_PRIORITY; + private Collection filesToCompact; + + // CompactRequest object creation time. + private long selectionTime; + // System time used to compare objects in FIFO order. TODO: maybe use selectionTime? + private Long timeInNanos; + private String regionName = ""; + private String storeName = ""; + private long totalSize = -1L; + + /** + * This ctor should be used by coprocessors that want to subclass CompactionRequest. + */ + public CompactionRequest() { + this.selectionTime = EnvironmentEdgeManager.currentTimeMillis(); + this.timeInNanos = System.nanoTime(); + } + + public CompactionRequest(Collection files) { + this(); + Preconditions.checkNotNull(files); + this.filesToCompact = files; + recalculateSize(); + } + + /** + * Called before compaction is executed by CompactSplitThread; for use by coproc subclasses. + */ + public void beforeExecute() {} + + /** + * Called after compaction is executed by CompactSplitThread; for use by coproc subclasses. + */ + public void afterExecute() {} + + /** + * Combines the request with other request. Coprocessors subclassing CR may override + * this if they want to do clever things based on CompactionPolicy selection that + * is passed to this method via "other". The default implementation just does a copy. + * @param other Request to combine with. + * @return The result (may be "this" or "other"). + */ + public CompactionRequest combineWith(CompactionRequest other) { + this.filesToCompact = new ArrayList(other.getFiles()); + this.isOffPeak = other.isOffPeak; + this.isMajor = other.isMajor; + this.priority = other.priority; + this.selectionTime = other.selectionTime; + this.timeInNanos = other.timeInNanos; + this.regionName = other.regionName; + this.storeName = other.storeName; + this.totalSize = other.totalSize; + return this; + } + + /** + * This function will define where in the priority queue the request will + * end up. Those with the highest priorities will be first. When the + * priorities are the same it will first compare priority then date + * to maintain a FIFO functionality. + * + *

Note: The date is only accurate to the millisecond which means it is + * possible that two requests were inserted into the queue within a + * millisecond. When that is the case this function will break the tie + * arbitrarily. + */ + @Override + public int compareTo(CompactionRequest request) { + //NOTE: The head of the priority queue is the least element + if (this.equals(request)) { + return 0; //they are the same request + } + int compareVal; + + compareVal = priority - request.priority; //compare priority + if (compareVal != 0) { + return compareVal; + } + + compareVal = timeInNanos.compareTo(request.timeInNanos); + if (compareVal != 0) { + return compareVal; + } + + // break the tie based on hash code + return this.hashCode() - request.hashCode(); + } + + @Override + public boolean equals(Object obj) { + return (this == obj); + } + + public Collection getFiles() { + return this.filesToCompact; + } + + /** + * Sets the region/store name, for logging. + */ + public void setDescription(String regionName, String storeName) { + this.regionName = regionName; + this.storeName = storeName; + } + + /** Gets the total size of all StoreFiles in compaction */ + public long getSize() { + return totalSize; + } + + public boolean isMajor() { + return this.isMajor; + } + + /** Gets the priority for the request */ + public int getPriority() { + return priority; + } + + /** Sets the priority for the request */ + public void setPriority(int p) { + this.priority = p; + } + + public boolean isOffPeak() { + return this.isOffPeak; + } + + public void setOffPeak(boolean value) { + this.isOffPeak = value; + } + + public long getSelectionTime() { + return this.selectionTime; + } + + /** + * Specify if this compaction should be a major compaction based on the state of the store + * @param isMajor true if the system determines that this compaction should be a major + * compaction + */ + public void setIsMajor(boolean isMajor) { + this.isMajor = isMajor; + } + + @Override + public String toString() { + String fsList = Joiner.on(", ").join( + Collections2.transform(Collections2.filter( + this.getFiles(), + new Predicate() { + public boolean apply(StoreFile sf) { + return sf.getReader() != null; } - })); - - return "regionName=" + region.getRegionNameAsString() + - ", storeName=" + new String(store.getFamily().getName()) + - ", fileCount=" + compactSelection.getFilesToCompact().size() + - ", fileSize=" + StringUtils.humanReadableInt(totalSize) + - ((fsList.isEmpty()) ? "" : " (" + fsList + ")") + - ", priority=" + priority + ", time=" + timeInNanos; - } - - @Override - public void run() { - Preconditions.checkNotNull(server); - if (server.isStopped()) { - return; - } - try { - long start = EnvironmentEdgeManager.currentTimeMillis(); - boolean completed = region.compact(this); - long now = EnvironmentEdgeManager.currentTimeMillis(); - LOG.info(((completed) ? "completed" : "aborted") + " compaction: " + - this + "; duration=" + StringUtils.formatTimeDiff(now, start)); - if (completed) { - // degenerate case: blocked regions require recursive enqueues - if (store.getCompactPriority() <= 0) { - server.compactSplitThread.requestCompaction(region, store, "Recursive enqueue", null); - } else { - // see if the compaction has caused us to exceed max region size - server.getCompactSplitThread().requestSplit(region); + }), new Function() { + public String apply(StoreFile sf) { + return StringUtils.humanReadableInt(sf.getReader().length()); } - } - } catch (IOException ex) { - LOG.error("Compaction failed " + this, RemoteExceptionHandler - .checkIOException(ex)); - server.checkFileSystem(); - } catch (Exception ex) { - LOG.error("Compaction failed " + this, ex); - server.checkFileSystem(); - } finally { - store.finishRequest(this); - LOG.debug("CompactSplitThread Status: " + server.compactSplitThread); - } - } + })); - /** - * Cleanup class to use when rejecting a compaction request from the queue. - */ - public static class Rejection implements RejectedExecutionHandler { - - @Override - public void rejectedExecution(Runnable request, ThreadPoolExecutor pool) { - if (request instanceof CompactionRequest) { - CompactionRequest cr = (CompactionRequest) request; - LOG.debug("Compaction Rejected: " + cr); - cr.getStore().finishRequest(cr); - } - } + return "regionName=" + regionName + ", storeName=" + storeName + + ", fileCount=" + this.getFiles().size() + + ", fileSize=" + StringUtils.humanReadableInt(totalSize) + + ((fsList.isEmpty()) ? "" : " (" + fsList + ")") + + ", priority=" + priority + ", time=" + timeInNanos; + } + + /** + * Recalculate the size of the compaction based on current files. + * @param files files that should be included in the compaction + */ + private void recalculateSize() { + long sz = 0; + for (StoreFile sf : this.filesToCompact) { + sz += sf.getReader().length(); } + this.totalSize = sz; + } } + Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java?rev=1450407&r1=1450406&r2=1450407&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java Tue Feb 26 20:51:59 2013 @@ -60,7 +60,9 @@ public abstract class Compactor { */ public List compactForTesting(final Collection filesToCompact, boolean isMajor) throws IOException { - return compact(CompactionRequest.getRequestForTesting(filesToCompact, isMajor)); + CompactionRequest cr = new CompactionRequest(filesToCompact); + cr.setIsMajor(isMajor); + return this.compact(cr); } public CompactionProgress getProgress() { Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java?rev=1450407&r1=1450406&r2=1450407&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java Tue Feb 26 20:51:59 2013 @@ -32,9 +32,11 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFileManager; import org.apache.hadoop.hbase.regionserver.StoreUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -49,16 +51,15 @@ import com.google.common.collect.Collect */ @InterfaceAudience.Private public class DefaultCompactionPolicy extends CompactionPolicy { - private static final Log LOG = LogFactory.getLog(DefaultCompactionPolicy.class); public DefaultCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) { super(conf, storeConfigInfo); } - @Override - public List preSelectCompaction( - List candidateFiles, final List filesCompacting) { + + private ArrayList getCurrentEligibleFiles( + ArrayList candidateFiles, final List filesCompacting) { // candidates = all storefiles not already in compaction queue if (!filesCompacting.isEmpty()) { // exclude all files older than the newest file we're currently @@ -71,6 +72,11 @@ public class DefaultCompactionPolicy ext return candidateFiles; } + public List preSelectCompactionForCoprocessor( + final Collection candidates, final List filesCompacting) { + return getCurrentEligibleFiles(new ArrayList(candidates), filesCompacting); + } + @Override public int getSystemCompactionPriority(final Collection storeFiles) { return this.comConf.getBlockingStorefileCount() - storeFiles.size(); @@ -81,20 +87,20 @@ public class DefaultCompactionPolicy ext * @return subset copy of candidate list that meets compaction criteria * @throws java.io.IOException */ - @Override - public CompactSelection selectCompaction(List candidateFiles, - final boolean isUserCompaction, final boolean mayUseOffPeak, final boolean forceMajor) - throws IOException { + public CompactionRequest selectCompaction(Collection candidateFiles, + final List filesCompacting, final boolean isUserCompaction, + final boolean mayUseOffPeak, final boolean forceMajor) throws IOException { // Preliminary compaction subject to filters - CompactSelection candidateSelection = new CompactSelection(candidateFiles); + ArrayList candidateSelection = new ArrayList(candidateFiles); + candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting); long cfTtl = this.storeConfigInfo.getStoreFileTtl(); if (!forceMajor) { // If there are expired files, only select them so that compaction deletes them if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) { - CompactSelection expiredSelection = selectExpiredStoreFiles( - candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl); + ArrayList expiredSelection = selectExpiredStoreFiles( + candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl); if (expiredSelection != null) { - return expiredSelection; + return new CompactionRequest(expiredSelection); } } candidateSelection = skipLargeFiles(candidateSelection); @@ -106,21 +112,23 @@ public class DefaultCompactionPolicy ext // Or, if there are any references among the candidates. boolean majorCompaction = ( (forceMajor && isUserCompaction) - || ((forceMajor || isMajorCompaction(candidateSelection.getFilesToCompact())) - && (candidateSelection.getFilesToCompact().size() < comConf.getMaxFilesToCompact())) - || StoreUtils.hasReferences(candidateSelection.getFilesToCompact()) + || ((forceMajor || isMajorCompaction(candidateSelection)) + && (candidateSelection.size() < comConf.getMaxFilesToCompact())) + || StoreUtils.hasReferences(candidateSelection) ); if (!majorCompaction) { // we're doing a minor compaction, let's see what files are applicable - candidateSelection.setOffPeak(mayUseOffPeak); candidateSelection = filterBulk(candidateSelection); - candidateSelection = applyCompactionPolicy(candidateSelection); + candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak); candidateSelection = checkMinFilesCriteria(candidateSelection); } - candidateSelection = - removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction); - return candidateSelection; + candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction); + CompactionRequest result = new CompactionRequest(candidateSelection); + if (!majorCompaction && !candidateSelection.isEmpty()) { + result.setOffPeak(mayUseOffPeak); + } + return result; } /** @@ -133,33 +141,25 @@ public class DefaultCompactionPolicy ext * @return A CompactSelection contains the expired store files as * filesToCompact */ - private CompactSelection selectExpiredStoreFiles( - CompactSelection candidates, long maxExpiredTimeStamp) { - List filesToCompact = candidates.getFilesToCompact(); - if (filesToCompact == null || filesToCompact.size() == 0) - return null; + private ArrayList selectExpiredStoreFiles( + ArrayList candidates, long maxExpiredTimeStamp) { + if (candidates == null || candidates.size() == 0) return null; ArrayList expiredStoreFiles = null; - boolean hasExpiredStoreFiles = false; - CompactSelection expiredSFSelection = null; - for (StoreFile storeFile : filesToCompact) { + for (StoreFile storeFile : candidates) { if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) { LOG.info("Deleting the expired store file by compaction: " + storeFile.getPath() + " whose maxTimeStamp is " + storeFile.getReader().getMaxTimestamp() + " while the max expired timestamp is " + maxExpiredTimeStamp); - if (!hasExpiredStoreFiles) { + if (expiredStoreFiles == null) { expiredStoreFiles = new ArrayList(); - hasExpiredStoreFiles = true; } expiredStoreFiles.add(storeFile); } } - if (hasExpiredStoreFiles) { - expiredSFSelection = new CompactSelection(expiredStoreFiles); - } - return expiredSFSelection; + return expiredStoreFiles; } /** @@ -168,18 +168,16 @@ public class DefaultCompactionPolicy ext * exclude all files above maxCompactSize * Also save all references. We MUST compact them */ - private CompactSelection skipLargeFiles(CompactSelection candidates) { + private ArrayList skipLargeFiles(ArrayList candidates) { int pos = 0; - while (pos < candidates.getFilesToCompact().size() && - candidates.getFilesToCompact().get(pos).getReader().length() > - comConf.getMaxCompactSize() && - !candidates.getFilesToCompact().get(pos).isReference()) { + while (pos < candidates.size() && !candidates.get(pos).isReference() + && (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize())) { ++pos; } if (pos > 0) { LOG.debug("Some files are too large. Excluding " + pos + " files from compaction candidates"); - candidates.clearSubList(0, pos); + candidates.subList(0, pos).clear(); } return candidates; } @@ -189,9 +187,8 @@ public class DefaultCompactionPolicy ext * @return filtered subset * exclude all bulk load files if configured */ - private CompactSelection filterBulk(CompactSelection candidates) { - candidates.getFilesToCompact().removeAll(Collections2.filter( - candidates.getFilesToCompact(), + private ArrayList filterBulk(ArrayList candidates) { + candidates.removeAll(Collections2.filter(candidates, new Predicate() { @Override public boolean apply(StoreFile input) { @@ -206,9 +203,9 @@ public class DefaultCompactionPolicy ext * @return filtered subset * take upto maxFilesToCompact from the start */ - private CompactSelection removeExcessFiles(CompactSelection candidates, + private ArrayList removeExcessFiles(ArrayList candidates, boolean isUserCompaction, boolean isMajorCompaction) { - int excess = candidates.getFilesToCompact().size() - comConf.getMaxFilesToCompact(); + int excess = candidates.size() - comConf.getMaxFilesToCompact(); if (excess > 0) { if (isMajorCompaction && isUserCompaction) { LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() + @@ -216,8 +213,7 @@ public class DefaultCompactionPolicy ext } else { LOG.debug("Too many admissible files. Excluding " + excess + " files from compaction candidates"); - candidates.clearSubList(comConf.getMaxFilesToCompact(), - candidates.getFilesToCompact().size()); + candidates.subList(comConf.getMaxFilesToCompact(), candidates.size()).clear(); } } return candidates; @@ -227,16 +223,14 @@ public class DefaultCompactionPolicy ext * @return filtered subset * forget the compactionSelection if we don't have enough files */ - private CompactSelection checkMinFilesCriteria(CompactSelection candidates) { + private ArrayList checkMinFilesCriteria(ArrayList candidates) { int minFiles = comConf.getMinFilesToCompact(); - if (candidates.getFilesToCompact().size() < minFiles) { + if (candidates.size() < minFiles) { if(LOG.isDebugEnabled()) { - LOG.debug("Not compacting files because we only have " + - candidates.getFilesToCompact().size() + - " files ready for compaction. Need " + minFiles + " to initiate."); + LOG.debug("Not compacting files because we only have " + candidates.size() + + " files ready for compaction. Need " + minFiles + " to initiate."); } - candidates.emptyFileList(); - candidates.setOffPeak(false); + candidates.clear(); } return candidates; } @@ -271,25 +265,26 @@ public class DefaultCompactionPolicy ext * | | | | | | | | | | | | * | | | | | | | | | | | | */ - CompactSelection applyCompactionPolicy(CompactSelection candidates) throws IOException { - if (candidates.getFilesToCompact().isEmpty()) { + ArrayList applyCompactionPolicy( + ArrayList candidates, boolean mayUseOffPeak) throws IOException { + if (candidates.isEmpty()) { return candidates; } // we're doing a minor compaction, let's see what files are applicable int start = 0; double ratio = comConf.getCompactionRatio(); - if (candidates.isOffPeakCompaction()) { + if (mayUseOffPeak) { ratio = comConf.getCompactionRatioOffPeak(); LOG.info("Running an off-peak compaction, selection ratio = " + ratio); } // get store file sizes for incremental compacting selection. - int countOfFiles = candidates.getFilesToCompact().size(); + final int countOfFiles = candidates.size(); long[] fileSizes = new long[countOfFiles]; long[] sumSize = new long[countOfFiles]; for (int i = countOfFiles - 1; i >= 0; --i) { - StoreFile file = candidates.getFilesToCompact().get(i); + StoreFile file = candidates.get(i); fileSizes[i] = file.getReader().length(); // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo int tooFar = i + comConf.getMaxFilesToCompact() - 1; @@ -309,8 +304,9 @@ public class DefaultCompactionPolicy ext + " files from " + countOfFiles + " candidates"); } - candidates = candidates.getSubList(start, countOfFiles); - + if (start > 0) { + candidates.subList(0, start).clear(); + } return candidates; } Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1450407&r1=1450406&r2=1450407&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original) +++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java Tue Feb 26 20:51:59 2013 @@ -594,7 +594,7 @@ public class TestCompaction extends HBas HStore store = (HStore) r.getStore(COLUMN_FAMILY); Collection storeFiles = store.getStorefiles(); - Compactor tool = store.compactor; + Compactor tool = store.storeEngine.getCompactor(); List newFiles = tool.compactForTesting(storeFiles, false); @@ -611,7 +611,7 @@ public class TestCompaction extends HBas stream.close(); try { - store.completeCompaction(storeFiles, origPath); + ((HStore)store).moveFileIntoPlace(origPath); } catch (Exception e) { // The complete compaction should fail and the corrupt file should remain // in the 'tmp' directory; @@ -635,7 +635,7 @@ public class TestCompaction extends HBas } store.triggerMajorCompaction(); - CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY, null); + CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY, null).getRequest(); assertNotNull("Expected to receive a compaction request", request); assertEquals( "System-requested major compaction should not occur if there are too many store files", @@ -653,7 +653,7 @@ public class TestCompaction extends HBas createStoreFile(r); } store.triggerMajorCompaction(); - CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER, null); + CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER, null).getRequest(); assertNotNull("Expected to receive a compaction request", request); assertEquals( "User-requested major compaction should always occur, even if there are too many store files", @@ -680,7 +680,7 @@ public class TestCompaction extends HBas } CountDownLatch latch = new CountDownLatch(1); - TrackableCompactionRequest request = new TrackableCompactionRequest(r, (HStore) store, latch); + TrackableCompactionRequest request = new TrackableCompactionRequest(latch); thread.requestCompaction(r, store, "test custom comapction", Store.PRIORITY_USER, request); // wait for the latch to complete. latch.await(); @@ -698,16 +698,15 @@ public class TestCompaction extends HBas * Constructor for a custom compaction. Uses the setXXX methods to update the state of the * compaction before being used. */ - public TrackableCompactionRequest(HRegion region, HStore store, CountDownLatch finished) { - super(region, store, Store.PRIORITY_USER); + public TrackableCompactionRequest(CountDownLatch finished) { + super(); this.done = finished; } @Override - public void run() { - super.run(); + public void afterExecute() { + super.afterExecute(); this.done.countDown(); } } - }