accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject git commit: ACCUMULO-1451 making major compaction configurable
Date Thu, 17 Oct 2013 18:33:31 GMT
Updated Branches:
  refs/heads/master 611d09c58 -> 4c3da02bc


ACCUMULO-1451 making major compaction configurable


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/4c3da02b
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/4c3da02b
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/4c3da02b

Branch: refs/heads/master
Commit: 4c3da02bcdb14caec648716679317cac837789cf
Parents: 611d09c
Author: Eric Newton <eric.newton@gmail.com>
Authored: Wed Oct 16 20:06:59 2013 -0400
Committer: Eric Newton <eric.newton@gmail.com>
Committed: Thu Oct 17 14:33:46 2013 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/core/conf/Property.java |   1 +
 .../minicluster/MiniAccumuloClusterGCTest.java  |   2 +-
 .../accumulo/server/tabletserver/Compactor.java |  14 +-
 .../accumulo/server/tabletserver/Tablet.java    | 166 +++---------
 .../server/tabletserver/TabletServer.java       |   2 +-
 .../TabletServerResourceManager.java            | 126 ++-------
 .../tabletserver/compaction/CompactionPass.java |  18 ++
 .../tabletserver/compaction/CompactionPlan.java |  28 ++
 .../compaction/CompactionStrategy.java          |  43 +++
 .../compaction/DefaultCompactionStrategy.java   | 196 ++++++++++++++
 .../tabletserver/compaction/DefaultWriter.java  |  17 ++
 .../compaction/MajorCompactionReason.java       |  27 ++
 .../compaction/MajorCompactionRequest.java      |  85 ++++++
 .../DefaultCompactionStrategyTest.java          | 263 +++++++++++++++++++
 14 files changed, 748 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/4c3da02b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 3adfd4e..8542a38 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -366,6 +366,7 @@ public enum Property {
   TABLE_INTERPRETER_CLASS("table.interepreter", DefaultScanInterpreter.class.getName(), PropertyType.STRING,
       "The ScanInterpreter class to apply on scan arguments in the shell"),
   TABLE_CLASSPATH("table.classpath.context", "", PropertyType.STRING, "Per table classpath context"),
+  TABLE_COMPACTION_STRATEGY("table.majc.compaction.strategy", "org.apache.accumulo.server.tabletserver.compaction.DefaultCompactionStrategy", PropertyType.CLASSNAME, "A customizable major compaction strategy."),
 
   // VFS ClassLoader properties
   VFS_CLASSLOADER_SYSTEM_CLASSPATH_PROPERTY(AccumuloVFSClassLoader.VFS_CLASSLOADER_SYSTEM_CLASSPATH_PROPERTY, "", PropertyType.STRING,

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4c3da02b/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterGCTest.java
----------------------------------------------------------------------
diff --git a/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterGCTest.java b/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterGCTest.java
index 1b2481b..e441a42 100644
--- a/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterGCTest.java
+++ b/minicluster/src/test/java/org/apache/accumulo/minicluster/MiniAccumuloClusterGCTest.java
@@ -76,7 +76,7 @@ public class MiniAccumuloClusterGCTest {
     accumulo.stop();
   }
 
-  @Test(timeout = 20000)
+  @Test(timeout = 40 * 1000)
   public void testFilesAreGarbageCollected() throws Exception {
     ZooKeeperInstance inst = new ZooKeeperInstance(accumulo.getInstanceName(), accumulo.getZooKeepers());
     Connector c = inst.getConnector("root", new PasswordToken(passwd));

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4c3da02b/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
index 875a332..760b6e0 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
@@ -62,8 +62,11 @@ import org.apache.accumulo.server.problems.ProblemReport;
 import org.apache.accumulo.server.problems.ProblemReportingIterator;
 import org.apache.accumulo.server.problems.ProblemReports;
 import org.apache.accumulo.server.problems.ProblemType;
-import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason;
 import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason;
+import org.apache.accumulo.server.tabletserver.compaction.CompactionStrategy;
+import org.apache.accumulo.server.tabletserver.compaction.CompactionStrategy.Writer;
+import org.apache.accumulo.server.tabletserver.compaction.DefaultWriter;
+import org.apache.accumulo.server.tabletserver.compaction.MajorCompactionReason;
 import org.apache.accumulo.trace.instrument.Span;
 import org.apache.accumulo.trace.instrument.Trace;
 import org.apache.hadoop.conf.Configuration;
@@ -162,6 +165,7 @@ public class Compactor implements Callable<CompactionStats> {
   private long compactorID = nextCompactorID.getAndIncrement();
 
   protected volatile Thread thread;
+  private Writer writer;
 
   private synchronized void setLocalityGroup(String name) {
     this.currentLocalityGroup = name;
@@ -285,7 +289,7 @@ public class Compactor implements Callable<CompactionStats> {
   }
 
   Compactor(Configuration conf, VolumeManager fs, Map<FileRef,DataFileValue> files, InMemoryMap imm, FileRef outputFile, boolean propogateDeletes,
-      TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env, List<IteratorSetting> iterators, MajorCompactionReason reason) {
+      TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env, List<IteratorSetting> iterators, MajorCompactionReason reason, CompactionStrategy.Writer writer) {
     this.extent = extent;
     this.conf = conf;
     this.fs = fs;
@@ -297,13 +301,14 @@ public class Compactor implements Callable<CompactionStats> {
     this.env = env;
     this.iterators = iterators;
     this.reason = reason;
+    this.writer = writer;
     
     startTime = System.currentTimeMillis();
   }
   
   Compactor(Configuration conf, VolumeManager fs, Map<FileRef,DataFileValue> files, InMemoryMap imm, FileRef outputFile, boolean propogateDeletes,
       TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env) {
-    this(conf, fs, files, imm, outputFile, propogateDeletes, acuTableConf, extent, env, new ArrayList<IteratorSetting>(), null);
+    this(conf, fs, files, imm, outputFile, propogateDeletes, acuTableConf, extent, env, new ArrayList<IteratorSetting>(), null, new DefaultWriter());
   }
   
   public VolumeManager getFileSystem() {
@@ -461,6 +466,7 @@ public class Compactor implements Callable<CompactionStats> {
   private void compactLocalityGroup(String lgName, Set<ByteSequence> columnFamilies, boolean inclusive, FileSKVWriter mfw, CompactionStats majCStats)
       throws IOException, CompactionCanceledException {
     ArrayList<FileSKVIterator> readers = new ArrayList<FileSKVIterator>(filesToCompact.size());
+    List<FileSKVWriter> writers = Collections.singletonList(mfw);
     Span span = Trace.start("compact");
     try {
       long entriesCompacted = 0;
@@ -499,7 +505,7 @@ public class Compactor implements Callable<CompactionStats> {
       Span write = Trace.start("write");
       try {
         while (itr.hasTop() && env.isCompactionEnabled()) {
-          mfw.append(itr.getTopKey(), itr.getTopValue());
+          writer.write(itr.getTopKey(), itr.getTopValue(), writers);
           itr.next();
           entriesCompacted++;
           

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4c3da02b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
index 90542e5..3b29e6c 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
@@ -116,6 +116,10 @@ import org.apache.accumulo.server.tabletserver.InMemoryMap.MemoryIterator;
 import org.apache.accumulo.server.tabletserver.TabletServer.TservConstraintEnv;
 import org.apache.accumulo.server.tabletserver.TabletServerResourceManager.TabletResourceManager;
 import org.apache.accumulo.server.tabletserver.TabletStatsKeeper.Operation;
+import org.apache.accumulo.server.tabletserver.compaction.CompactionPlan;
+import org.apache.accumulo.server.tabletserver.compaction.DefaultCompactionStrategy;
+import org.apache.accumulo.server.tabletserver.compaction.MajorCompactionReason;
+import org.apache.accumulo.server.tabletserver.compaction.MajorCompactionRequest;
 import org.apache.accumulo.server.tabletserver.log.DfsLogger;
 import org.apache.accumulo.server.tabletserver.log.MutationReceiver;
 import org.apache.accumulo.server.tabletserver.mastermessage.TabletStatusMessage;
@@ -157,15 +161,6 @@ import org.apache.zookeeper.KeeperException.NoNodeException;
 
 public class Tablet {
   
-  enum MajorCompactionReason {
-    // do not change the order, the order of this enum determines the order
-    // in which queued major compactions are executed
-    USER,
-    CHOP,
-    NORMAL,
-    IDLE
-  }
-  
   enum MinorCompactionReason {
     USER, SYSTEM, CLOSE
   }
@@ -927,7 +922,7 @@ public class Tablet {
       
     }
     
-    public void reserveMajorCompactingFiles(Set<FileRef> files) {
+    public void reserveMajorCompactingFiles(Collection<FileRef> files) {
       if (majorCompactingFiles.size() != 0)
         throw new IllegalStateException("Major compacting files not empty " + majorCompactingFiles);
       
@@ -2915,90 +2910,6 @@ public class Tablet {
     return tabletResources.needsMajorCompaction(datafileManager.getDatafileSizes(), reason);
   }
   
-  private class CompactionTuple {
-    private Map<FileRef,Long> filesToCompact;
-    private boolean compactAll;
-    
-    public CompactionTuple(Map<FileRef,Long> filesToCompact, boolean doAll) {
-      this.filesToCompact = filesToCompact;
-      compactAll = doAll;
-    }
-    
-    public Map<FileRef,Long> getFilesToCompact() {
-      return filesToCompact;
-    }
-    
-    public boolean getCompactAll() {
-      return compactAll;
-    }
-  }
-  
-  /**
-   * Returns list of files that need to be compacted by major compactor
-   */
-  
-  private CompactionTuple getFilesToCompact(MajorCompactionReason reason, Map<FileRef,Pair<Key,Key>> falks) {
-    SortedMap<FileRef,DataFileValue> files = datafileManager.getDatafileSizes();
-    
-    Map<FileRef,Long> toCompact;
-    if (reason == MajorCompactionReason.CHOP) {
-      toCompact = findChopFiles(files, falks);
-    } else {
-      toCompact = tabletResources.findMapFilesToCompact(files, reason);
-    }
-    if (toCompact == null)
-      return null;
-    return new CompactionTuple(toCompact, toCompact.size() == files.size());
-  }
-  
-  private Map<FileRef,Pair<Key,Key>> getFirstAndLastKeys(SortedMap<FileRef,DataFileValue> files) throws IOException {
-    FileOperations fileFactory = FileOperations.getInstance();
-    
-    Map<FileRef,Pair<Key,Key>> falks = new HashMap<FileRef,Pair<Key,Key>>();
-    
-    for (Entry<FileRef,DataFileValue> entry : files.entrySet()) {
-      FileRef file = entry.getKey();
-      FileSystem ns = fs.getFileSystemByPath(file.path());
-      FileSKVIterator openReader = fileFactory.openReader(file.path().toString(), true, ns, ns.getConf(), acuTableConf);
-      try {
-        Key first = openReader.getFirstKey();
-        Key last = openReader.getLastKey();
-        falks.put(file, new Pair<Key,Key>(first, last));
-      } finally {
-        openReader.close();
-      }
-    }
-    return falks;
-  }
-  
-  private Map<FileRef,Long> findChopFiles(SortedMap<FileRef,DataFileValue> files, Map<FileRef,Pair<Key,Key>> falks) {
-    
-    Map<FileRef,Long> result = new HashMap<FileRef,Long>();
-    
-    for (Entry<FileRef,DataFileValue> entry : files.entrySet()) {
-      FileRef file = entry.getKey();
-      
-      Pair<Key,Key> pair = falks.get(file);
-      if (pair == null) {
-        // file was created or imported after we obtained the first an last keys... there
-        // are a few options here... throw an exception which will cause the compaction to
-        // retry and also cause ugly error message that the admin has to ignore... could
-        // go get the first and last key, but this code is called while the tablet lock
-        // is held... or just compact the file....
-        result.put(file, entry.getValue().getSize());
-      } else {
-        Key first = pair.getFirst();
-        Key last = pair.getSecond();
-        // If first and last are null, it's an empty file. Add it to the compact set so it goes away.
-        if ((first == null && last == null) || !this.extent.contains(first.getRow()) || !this.extent.contains(last.getRow())) {
-          result.put(file, entry.getValue().getSize());
-        }
-      }
-    }
-    return result;
-    
-  }
-  
   /**
    * Returns an int representing the total block size of the mapfiles served by this tablet.
    * 
@@ -3142,20 +3053,20 @@ public class Tablet {
   
   private CompactionStats _majorCompact(MajorCompactionReason reason) throws IOException, CompactionCanceledException {
     
-    boolean propogateDeletes;
-    
     long t1, t2, t3;
     
-    // acquire first and last key info outside of tablet lock
-    Map<FileRef,Pair<Key,Key>> falks = null;
-    if (reason == MajorCompactionReason.CHOP)
-      falks = getFirstAndLastKeys(datafileManager.getDatafileSizes());
-    
-    Map<FileRef,Long> filesToCompact;
+    // acquire file info outside of tablet lock
+    DefaultCompactionStrategy strategy = new DefaultCompactionStrategy();
+    MajorCompactionRequest request = new MajorCompactionRequest(extent, reason, fs, acuTableConf);
+    request.setFiles(datafileManager.getDatafileSizes());
+    strategy.gatherInformation(request);
+
+    Map<FileRef, DataFileValue> filesToCompact;
     
     int maxFilesToCompact = acuTableConf.getCount(Property.TSERV_MAJC_THREAD_MAXOPEN);
     
     CompactionStats majCStats = new CompactionStats();
+    CompactionPlan plan;
     
     synchronized (this) {
       // plan all that work that needs to be done in the sync block... then do the actual work
@@ -3179,28 +3090,22 @@ public class Tablet {
         // removed by a major compaction
         cleanUpFiles(fs, fs.listStatus(this.location), false);
       }
-      
-      // getFilesToCompact() and cleanUpFiles() both
-      // do dir listings, which means two calls to the namenode
-      // we should refactor so that there is only one call
-      CompactionTuple ret = getFilesToCompact(reason, falks);
-      if (ret == null) {
-        // nothing to compact
+      request.setFiles(datafileManager.getDatafileSizes());
+      plan = strategy.getCompactionPlan(request);
+      if (plan == null || plan.passes.isEmpty()) {
         return majCStats;
       }
-      filesToCompact = ret.getFilesToCompact();
-      
-      if (!ret.getCompactAll()) {
-        // since not all files are being compacted, we want to propagate delete entries
-        propogateDeletes = true;
-      } else {
-        propogateDeletes = false;
-      }
+      log.debug("Major compaction plan: " + plan);
+      if (plan.passes.size() > 1)
+        log.info("Multiple passes presently not supported, only performing the first pass");
+      if (plan.passes.get(0).outputFiles != 1)
+        log.warn("Only one output file is supported, but " + plan.passes.get(0).outputFiles + " requested");
+      filesToCompact = new HashMap<FileRef, DataFileValue>(request.getFiles());
+      filesToCompact.keySet().retainAll(plan.passes.get(0).inputFiles);
       
       t3 = System.currentTimeMillis();
       
       datafileManager.reserveMajorCompactingFiles(filesToCompact.keySet());
-      
     }
     
     try {
@@ -3208,7 +3113,7 @@ public class Tablet {
       log.debug(String.format("MajC initiate lock %.2f secs, wait %.2f secs", (t3 - t2) / 1000.0, (t2 - t1) / 1000.0));
       
       Pair<Long,List<IteratorSetting>> compactionId = null;
-      if (!propogateDeletes) {
+      if (!plan.propogateDeletes) {
         // compacting everything, so update the compaction id in !METADATA
         try {
           compactionId = getCompactionID();
@@ -3247,7 +3152,7 @@ public class Tablet {
         
         Set<FileRef> smallestFiles = removeSmallest(filesToCompact, numToCompact);
         
-        FileRef fileName = getNextMapFilename((filesToCompact.size() == 0 && !propogateDeletes) ? "A" : "C");
+        FileRef fileName = getNextMapFilename((filesToCompact.size() == 0 && !plan.propogateDeletes) ? "A" : "C");
         FileRef compactTmpName = new FileRef(fileName.path().toString() + "_tmp");
         
         Span span = Trace.start("compactFiles");
@@ -3272,10 +3177,11 @@ public class Tablet {
           copy.keySet().retainAll(smallestFiles);
           
           log.debug("Starting MajC " + extent + " (" + reason + ") " + copy.keySet() + " --> " + compactTmpName + "  " + compactionIterators);
-          
+
           // always propagate deletes, unless last batch
-          Compactor compactor = new Compactor(conf, fs, copy, null, compactTmpName, filesToCompact.size() == 0 ? propogateDeletes : true, acuTableConf, extent,
-              cenv, compactionIterators, reason);
+          boolean lastBatch = filesToCompact.isEmpty();
+          Compactor compactor = new Compactor(conf, fs, copy, null, compactTmpName, lastBatch ? plan.propogateDeletes : true, acuTableConf, extent,
+              cenv, compactionIterators, reason, strategy.getCompactionWriter());
           
           CompactionStats mcs = compactor.call();
           
@@ -3284,6 +3190,9 @@ public class Tablet {
           span.data("written", "" + mcs.getEntriesWritten());
           majCStats.add(mcs);
           
+          if (lastBatch) {
+            smallestFiles.addAll(plan.deleteFiles);
+          }
           datafileManager.bringMajorCompactionOnline(smallestFiles, compactTmpName, fileName,
               filesToCompact.size() == 0 && compactionId != null ? compactionId.getFirst() : null,
               new DataFileValue(mcs.getFileSize(), mcs.getEntriesWritten()));
@@ -3291,14 +3200,13 @@ public class Tablet {
           // when major compaction produces a file w/ zero entries, it will be deleted... do not want
           // to add the deleted file
           if (filesToCompact.size() > 0 && mcs.getEntriesWritten() > 0) {
-            filesToCompact.put(fileName, mcs.getFileSize());
+            filesToCompact.put(fileName, new DataFileValue(mcs.getFileSize(), mcs.getEntriesWritten()));
           }
         } finally {
           span.stop();
         }
         
       }
-      
       return majCStats;
     } finally {
       synchronized (Tablet.this) {
@@ -3307,7 +3215,7 @@ public class Tablet {
     }
   }
   
-  private Set<FileRef> removeSmallest(Map<FileRef,Long> filesToCompact, int maxFilesToCompact) {
+  private Set<FileRef> removeSmallest(Map<FileRef,DataFileValue> filesToCompact, int maxFilesToCompact) {
     // ensure this method works properly when multiple files have the same size
     
     PriorityQueue<Pair<FileRef,Long>> fileHeap = new PriorityQueue<Pair<FileRef,Long>>(filesToCompact.size(), new Comparator<Pair<FileRef,Long>>() {
@@ -3321,9 +3229,9 @@ public class Tablet {
       }
     });
     
-    for (Iterator<Entry<FileRef,Long>> iterator = filesToCompact.entrySet().iterator(); iterator.hasNext();) {
-      Entry<FileRef,Long> entry = iterator.next();
-      fileHeap.add(new Pair<FileRef,Long>(entry.getKey(), entry.getValue()));
+    for (Iterator<Entry<FileRef,DataFileValue>> iterator = filesToCompact.entrySet().iterator(); iterator.hasNext();) {
+      Entry<FileRef,DataFileValue> entry = iterator.next();
+      fileHeap.add(new Pair<FileRef,Long>(entry.getKey(), entry.getValue().getSize()));
     }
     
     Set<FileRef> smallestFiles = new HashSet<FileRef>();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4c3da02b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
index 4e06b6d..b077729 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
@@ -175,7 +175,7 @@ import org.apache.accumulo.server.tabletserver.RowLocks.RowLock;
 import org.apache.accumulo.server.tabletserver.Tablet.CommitSession;
 import org.apache.accumulo.server.tabletserver.Tablet.KVEntry;
 import org.apache.accumulo.server.tabletserver.Tablet.LookupResult;
-import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason;
+import org.apache.accumulo.server.tabletserver.compaction.MajorCompactionReason;
 import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason;
 import org.apache.accumulo.server.tabletserver.Tablet.ScanBatch;
 import org.apache.accumulo.server.tabletserver.Tablet.Scanner;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4c3da02b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
index 78313e7..487898b 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
@@ -19,14 +19,12 @@ package org.apache.accumulo.server.tabletserver;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.SortedMap;
 import java.util.TreeMap;
-import java.util.TreeSet;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -35,13 +33,11 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.accumulo.trace.instrument.TraceExecutorService;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache;
-import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.LoggingRunnable;
@@ -51,10 +47,14 @@ import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.tabletserver.FileManager.ScanFileManager;
-import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason;
 import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason;
+import org.apache.accumulo.server.tabletserver.compaction.CompactionPlan;
+import org.apache.accumulo.server.tabletserver.compaction.DefaultCompactionStrategy;
+import org.apache.accumulo.server.tabletserver.compaction.MajorCompactionReason;
+import org.apache.accumulo.server.tabletserver.compaction.MajorCompactionRequest;
 import org.apache.accumulo.server.util.time.SimpleTimer;
 import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
+import org.apache.accumulo.trace.instrument.TraceExecutorService;
 import org.apache.log4j.Logger;
 
 /**
@@ -81,6 +81,8 @@ public class TabletServerResourceManager {
   
   private HashSet<TabletResourceManager> tabletResources;
   
+  private final VolumeManager fs;
+  
   private FileManager fileManager;
   
   private MemoryManager memoryManager;
@@ -143,6 +145,7 @@ public class TabletServerResourceManager {
   
   public TabletServerResourceManager(Instance instance, VolumeManager fs) {
     this.conf = new ServerConfiguration(instance);
+    this.fs = fs;
     final AccumuloConfiguration acuConf = conf.getConfiguration();
     
     long maxMemory = acuConf.getMemoryInBytes(Property.TSERV_MAXMEM);
@@ -455,16 +458,6 @@ public class TabletServerResourceManager {
     tabletResources.remove(tr);
   }
   
-  private class MapFileInfo {
-    private final FileRef path;
-    private final long size;
-    
-    MapFileInfo(FileRef path, long size) {
-      this.path = path;
-      this.size = size;
-    }
-  }
-  
   public class TabletResourceManager {
     
     private final long creationTime = System.currentTimeMillis();
@@ -545,90 +538,6 @@ public class TabletServerResourceManager {
     // BEGIN methods that Tablets call to make decisions about major compaction
     // when too many files are open, we may want tablets to compact down
     // to one map file
-    Map<FileRef,Long> findMapFilesToCompact(SortedMap<FileRef,DataFileValue> tabletFiles, MajorCompactionReason reason) {
-      if (reason == MajorCompactionReason.USER) {
-        Map<FileRef,Long> files = new HashMap<FileRef,Long>();
-        for (Entry<FileRef,DataFileValue> entry : tabletFiles.entrySet()) {
-          files.put(entry.getKey(), entry.getValue().getSize());
-        }
-        return files;
-      }
-      
-      if (tabletFiles.size() <= 1)
-        return null;
-      TreeSet<MapFileInfo> candidateFiles = new TreeSet<MapFileInfo>(new Comparator<MapFileInfo>() {
-        @Override
-        public int compare(MapFileInfo o1, MapFileInfo o2) {
-          if (o1 == o2)
-            return 0;
-          if (o1.size < o2.size)
-            return -1;
-          if (o1.size > o2.size)
-            return 1;
-          return o1.path.compareTo(o2.path);
-        }
-      });
-      
-      double ratio = tableConf.getFraction(Property.TABLE_MAJC_RATIO);
-      int maxFilesToCompact = tableConf.getCount(Property.TSERV_MAJC_THREAD_MAXOPEN);
-      int maxFilesPerTablet = tableConf.getMaxFilesPerTablet();
-      
-      for (Entry<FileRef,DataFileValue> entry : tabletFiles.entrySet()) {
-        candidateFiles.add(new MapFileInfo(entry.getKey(), entry.getValue().getSize()));
-      }
-      
-      long totalSize = 0;
-      for (MapFileInfo mfi : candidateFiles) {
-        totalSize += mfi.size;
-      }
-      
-      Map<FileRef,Long> files = new HashMap<FileRef,Long>();
-      
-      while (candidateFiles.size() > 1) {
-        MapFileInfo max = candidateFiles.last();
-        if (max.size * ratio <= totalSize) {
-          files.clear();
-          for (MapFileInfo mfi : candidateFiles) {
-            files.put(mfi.path, mfi.size);
-            if (files.size() >= maxFilesToCompact)
-              break;
-          }
-          
-          break;
-        }
-        totalSize -= max.size;
-        candidateFiles.remove(max);
-      }
-      
-      int totalFilesToCompact = 0;
-      if (tabletFiles.size() > maxFilesPerTablet)
-        totalFilesToCompact = tabletFiles.size() - maxFilesPerTablet + 1;
-      
-      totalFilesToCompact = Math.min(totalFilesToCompact, maxFilesToCompact);
-      
-      if (files.size() < totalFilesToCompact) {
-        
-        TreeMap<FileRef,DataFileValue> tfc = new TreeMap<FileRef,DataFileValue>(tabletFiles);
-        tfc.keySet().removeAll(files.keySet());
-        
-        // put data in candidateFiles to sort it
-        candidateFiles.clear();
-        for (Entry<FileRef,DataFileValue> entry : tfc.entrySet())
-          candidateFiles.add(new MapFileInfo(entry.getKey(), entry.getValue().getSize()));
-        
-        for (MapFileInfo mfi : candidateFiles) {
-          files.put(mfi.path, mfi.size);
-          if (files.size() >= totalFilesToCompact)
-            break;
-        }
-      }
-      
-      if (files.size() == 0)
-        return null;
-      
-      return files;
-    }
-    
     boolean needsMajorCompaction(SortedMap<FileRef,DataFileValue> tabletFiles, MajorCompactionReason reason) {
       if (closed)
         return false;// throw new IOException("closed");
@@ -652,11 +561,18 @@ public class TabletServerResourceManager {
         if (idleTime < tableConf.getTimeInMillis(Property.TABLE_MAJC_COMPACTALL_IDLETIME)) {
           return false;
         }
-      }/*
-        * else{ threshold = tableConf.getCount(Property.TABLE_MAJC_THRESHOLD); }
-        */
-      
-      return findMapFilesToCompact(tabletFiles, reason) != null;
+      }
+      DefaultCompactionStrategy strategy = new DefaultCompactionStrategy();
+      MajorCompactionRequest request = new MajorCompactionRequest(tablet.getExtent(), reason, TabletServerResourceManager.this.fs, tableConf);
+      request.setFiles(tabletFiles);
+      try {
+        CompactionPlan plan = strategy.getCompactionPlan(request);
+        if (plan == null || plan.passes.isEmpty())
+          return false;
+        return true;
+      } catch (IOException ex) {
+        return false;
+      }
     }
     
     // END methods that Tablets call to make decisions about major compaction
@@ -710,7 +626,7 @@ public class TabletServerResourceManager {
   }
   
   public void executeMajorCompaction(KeyExtent tablet, Runnable compactionTask) {
-    if (tablet.equals(RootTable.EXTENT)) {
+    if (tablet.isRootTablet()) {
       rootMajorCompactionThreadPool.execute(compactionTask);
     } else if (tablet.isMeta()) {
       defaultMajorCompactionThreadPool.execute(compactionTask);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4c3da02b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionPass.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionPass.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionPass.java
new file mode 100644
index 0000000..0ac6199
--- /dev/null
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionPass.java
@@ -0,0 +1,18 @@
+package org.apache.accumulo.server.tabletserver.compaction;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.accumulo.server.fs.FileRef;
+
+/**
+ * Information about a single compaction pass: input files and the number of output files to write.
+ * Presently, the number of output files must always be 1.
+ */
+public class CompactionPass {
+  public List<FileRef> inputFiles = new ArrayList<FileRef>();
+  public int outputFiles = 1;
+  public String toString() {
+    return inputFiles.toString() + " -> " + outputFiles + " files";
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4c3da02b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionPlan.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionPlan.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionPlan.java
new file mode 100644
index 0000000..0a4d734
--- /dev/null
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionPlan.java
@@ -0,0 +1,28 @@
+package org.apache.accumulo.server.tabletserver.compaction;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.accumulo.server.fs.FileRef;
+
+/**
+ * A plan for a compaction: the passes to run over input files, producing output files to replace them,  
+ * the files that are *not* inputs to a compaction that should simply be deleted, and weather or not to 
+ * propagate deletes from input files to output files.
+ */
+public class CompactionPlan {
+  public List<CompactionPass> passes = new ArrayList<CompactionPass>();
+  public List<FileRef> deleteFiles = new ArrayList<FileRef>();
+  public boolean propogateDeletes = true;
+  public String toString() {
+    StringBuilder b = new StringBuilder();
+    b.append(passes.toString());
+    if (!deleteFiles.isEmpty()) { 
+      b.append(" files to be deleted ");
+      b.append(deleteFiles);
+    }
+    b.append(" propogateDeletes ");
+    b.append(propogateDeletes);
+    return b.toString(); 
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4c3da02b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionStrategy.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionStrategy.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionStrategy.java
new file mode 100644
index 0000000..1564f74
--- /dev/null
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionStrategy.java
@@ -0,0 +1,43 @@
+package org.apache.accumulo.server.tabletserver.compaction;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileSKVWriter;
+
+/**
+ * The interface for customizing major compactions.
+ */
+public interface CompactionStrategy {
+  
+  /**
+   * Called for each output key/value to determine which file should get which key/value pair.
+   */
+  public interface Writer {
+    void write(Key key, Value value, List<FileSKVWriter> outputFiles) throws IOException;
+  }
+
+  /**
+   * Called prior to obtaining the tablet lock, useful for examining metadata or indexes.
+   * @param request basic details about the tablet
+   * @throws IOException
+   */
+  void gatherInformation(MajorCompactionRequest request) throws IOException;
+  
+  /** 
+   * Get the plan for compacting a tablets files.  Called while holding the tablet lock, so it should not be doing any blocking.
+   * @param request basic details about the tablet
+   * @return the plan for a major compaction
+   * @throws IOException
+   */
+  CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException;
+  
+  /**
+   * Get the callback for this compaction to determine where to write the output.
+   * @return
+   */
+  Writer getCompactionWriter();
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4c3da02b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/DefaultCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/DefaultCompactionStrategy.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/DefaultCompactionStrategy.java
new file mode 100644
index 0000000..77cc5d3
--- /dev/null
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/DefaultCompactionStrategy.java
@@ -0,0 +1,196 @@
+package org.apache.accumulo.server.tabletserver.compaction;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.server.fs.FileRef;
+
+public class DefaultCompactionStrategy implements CompactionStrategy {
+  
+  Map<FileRef,Pair<Key,Key>> firstAndLastKeys = null;
+  
+  @Override
+  public void gatherInformation(MajorCompactionRequest request) throws IOException {
+    if (request.getReason() == MajorCompactionReason.CHOP) {
+      firstAndLastKeys = getFirstAndLastKeys(request);
+    }
+  }
+@Override
+  public CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException {
+    CompactionPlan result = new CompactionPlan();
+    
+    List<FileRef> toCompact;
+    MajorCompactionReason reason = request.getReason();
+    if (reason == MajorCompactionReason.CHOP) {
+      toCompact = findChopFiles(request);
+    } else {
+      toCompact = findMapFilesToCompact(request);
+    }
+    CompactionPass pass = new CompactionPass();
+    pass.inputFiles = toCompact;
+    if (toCompact == null || toCompact.isEmpty())
+      return result;
+    result.passes.add(pass);
+    result.propogateDeletes = toCompact.size() != request.getFiles().size();
+    return result;
+  }
+  
+  private Map<FileRef,Pair<Key,Key>> getFirstAndLastKeys(MajorCompactionRequest request) throws IOException {
+    Map<FileRef,Pair<Key,Key>> result = new HashMap<FileRef,Pair<Key,Key>>();
+    
+    for (Entry<FileRef,DataFileValue> entry : request.getFiles().entrySet()) {
+      FileRef file = entry.getKey();
+      FileSKVIterator openReader = request.openReader(file);
+      try {
+        Key first = openReader.getFirstKey();
+        Key last = openReader.getLastKey();
+        result.put(file, new Pair<Key,Key>(first, last));
+      } finally {
+        openReader.close();
+      }
+    }
+    return result;
+  }
+
+  
+  List<FileRef> findChopFiles(MajorCompactionRequest request) throws IOException {
+    List<FileRef> result = new ArrayList<FileRef>();
+    if (firstAndLastKeys == null) {
+      // someone called getCompactionPlan without calling gatherInformation: compact everything
+      result.addAll(request.getFiles().keySet());
+      return result;
+    } 
+    
+    for (Entry<FileRef,DataFileValue> entry : request.getFiles().entrySet()) {
+      FileRef file = entry.getKey();
+      Pair<Key,Key> pair = firstAndLastKeys.get(file);
+      if (pair == null) {
+        // file was created or imported after we obtained the first and last keys... there
+        // are a few options here... throw an exception which will cause the compaction to
+        // retry and also cause ugly error message that the admin has to ignore... could
+        // go get the first and last key, but this code is called while the tablet lock
+        // is held... or just compact the file....
+        result.add(file);
+      } else {
+        Key first = pair.getFirst();
+        Key last = pair.getSecond();
+        // If first and last are null, it's an empty file. Add it to the compact set so it goes away.
+        KeyExtent extent = request.getExtent();
+        if ((first == null && last == null) || !extent.contains(first.getRow()) || !extent.contains(last.getRow())) {
+          result.add(file);
+        }
+      }
+    }
+    return result;
+  }
+  
+  private static class CompactionFile {
+    public FileRef file;
+    public long size;
+    public CompactionFile(FileRef file, long size) {
+      super();
+      this.file = file;
+      this.size = size;
+    }
+  }
+
+  
+  private List<FileRef> findMapFilesToCompact(MajorCompactionRequest request) {
+    MajorCompactionReason reason = request.getReason();
+    if (reason == MajorCompactionReason.USER) {
+      return new ArrayList<FileRef>(request.getFiles().keySet());
+    }
+    
+    if (request.getFiles().size() <= 1)
+      return null;
+    TreeSet<CompactionFile> candidateFiles = new TreeSet<CompactionFile>(new Comparator<CompactionFile>() {
+      @Override
+      public int compare(CompactionFile o1, CompactionFile o2) {
+        if (o1 == o2)
+          return 0;
+        if (o1.size < o2.size)
+          return -1;
+        if (o1.size > o2.size)
+          return 1;
+        return o1.file.compareTo(o2.file);
+      }
+    });
+    
+    double ratio = Double.parseDouble(request.getTableConfig(Property.TABLE_MAJC_RATIO.getKey()));
+    int maxFilesToCompact = Integer.parseInt(request.getTableConfig(Property.TSERV_MAJC_THREAD_MAXOPEN.getKey()));
+    int maxFilesPerTablet = request.getMaxFilesPerTablet();
+    
+    for (Entry<FileRef,DataFileValue> entry : request.getFiles().entrySet()) {
+      candidateFiles.add(new CompactionFile(entry.getKey(), entry.getValue().getSize()));
+    }
+    
+    long totalSize = 0;
+    for (CompactionFile mfi : candidateFiles) {
+      totalSize += mfi.size;
+    }
+    
+    List<FileRef> files = new ArrayList<FileRef>();
+    
+    while (candidateFiles.size() > 1) {
+      CompactionFile max = candidateFiles.last();
+      if (max.size * ratio <= totalSize) {
+        files.clear();
+        for (CompactionFile mfi : candidateFiles) {
+          files.add(mfi.file);
+          if (files.size() >= maxFilesToCompact)
+            break;
+        }
+        
+        break;
+      }
+      totalSize -= max.size;
+      candidateFiles.remove(max);
+    }
+    
+    int totalFilesToCompact = 0;
+    if (request.getFiles().size() > maxFilesPerTablet)
+      totalFilesToCompact = request.getFiles().size() - maxFilesPerTablet + 1;
+    
+    totalFilesToCompact = Math.min(totalFilesToCompact, maxFilesToCompact);
+    
+    if (files.size() < totalFilesToCompact) {
+      
+      TreeMap<FileRef,Long> tfc = new TreeMap<FileRef,Long>();
+      for (Entry<FileRef,DataFileValue> entry : request.getFiles().entrySet()) {
+          tfc.put(entry.getKey(), entry.getValue().getSize());
+      }
+      tfc.keySet().removeAll(files);
+      
+      // put data in candidateFiles to sort it
+      candidateFiles.clear();
+      for (Entry<FileRef,Long> entry : tfc.entrySet())
+        candidateFiles.add(new CompactionFile(entry.getKey(), entry.getValue()));
+      
+      for (CompactionFile mfi : candidateFiles) {
+        files.add(mfi.file);
+        if (files.size() >= totalFilesToCompact)
+          break;
+      }
+    }
+    
+    return files;
+  }
+  @Override
+  public Writer getCompactionWriter() {
+    return new DefaultWriter();
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4c3da02b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/DefaultWriter.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/DefaultWriter.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/DefaultWriter.java
new file mode 100644
index 0000000..a42e532
--- /dev/null
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/DefaultWriter.java
@@ -0,0 +1,17 @@
+package org.apache.accumulo.server.tabletserver.compaction;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileSKVWriter;
+
+public class DefaultWriter implements CompactionStrategy.Writer {
+  
+  @Override
+  public void write(Key key, Value value, List<FileSKVWriter> outputFiles) throws IOException {
+    outputFiles.get(0).append(key, value);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4c3da02b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/MajorCompactionReason.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/MajorCompactionReason.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/MajorCompactionReason.java
new file mode 100644
index 0000000..e7f4033
--- /dev/null
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/MajorCompactionReason.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver.compaction;
+
+
+public enum MajorCompactionReason {
+  // do not change the order, the order of this enum determines the order
+  // in which queued major compactions are executed
+  USER,
+  CHOP,
+  NORMAL,
+  IDLE
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4c3da02b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/MajorCompactionRequest.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/MajorCompactionRequest.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/MajorCompactionRequest.java
new file mode 100644
index 0000000..817dcfe
--- /dev/null
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/MajorCompactionRequest.java
@@ -0,0 +1,85 @@
+package org.apache.accumulo.server.tabletserver.compaction;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Information that can be used to determine how a tablet is to be major compacted, if needed.
+ */
+public class MajorCompactionRequest {
+  final private KeyExtent extent;
+  final private MajorCompactionReason reason;
+  final private VolumeManager volumeManager;
+  final private AccumuloConfiguration tableConfig;
+  private Map<FileRef,DataFileValue> files;
+  
+  public MajorCompactionRequest(
+      KeyExtent extent, 
+      MajorCompactionReason reason, 
+      VolumeManager manager, 
+      AccumuloConfiguration tabletConfig) {
+    this.extent = extent;
+    this.reason = reason;
+    this.volumeManager = manager;
+    this.tableConfig = tabletConfig;
+    this.files = Collections.emptyMap();
+  }
+  
+  public KeyExtent getExtent() {
+    return extent;
+  }
+  
+  public MajorCompactionReason getReason() {
+    return reason;
+  }
+  
+  public Map<FileRef,DataFileValue> getFiles() {
+    return files;
+  }
+  
+  public void setFiles(Map<FileRef,DataFileValue> update) {
+    this.files = Collections.unmodifiableMap(update);
+  }
+  
+  FileStatus[] listStatus(Path path) throws IOException {
+    // @TODO verify the file isn't some random file in HDFS
+    return volumeManager.listStatus(path);
+  }
+  
+  FileSKVIterator openReader(FileRef ref) throws IOException {
+    // @TODO verify the file isn't some random file in HDFS
+    // @TODO ensure these files are always closed?
+    FileOperations fileFactory = FileOperations.getInstance();
+    FileSystem ns = volumeManager.getFileSystemByPath(ref.path());
+    FileSKVIterator openReader = fileFactory.openReader(ref.path().toString(), true, ns, ns.getConf(), tableConfig);
+    return openReader;
+  }
+  
+  public Map<String,String> getTableProperties() {
+    return tableConfig.getAllPropertiesWithPrefix(Property.TABLE_PREFIX);
+  }
+
+  public String getTableConfig(String key) {
+    Property property = Property.getPropertyByKey(key);
+    if (property == null || property.isSensitive())
+      throw new RuntimeException("Unable to access the configuration value " + key);
+    return tableConfig.get(property);
+  }
+  
+  public int getMaxFilesPerTablet() {
+    return tableConfig.getMaxFilesPerTablet();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4c3da02b/server/src/test/java/org/apache/accumulo/server/tabletserver/compaction/DefaultCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/accumulo/server/tabletserver/compaction/DefaultCompactionStrategyTest.java b/server/src/test/java/org/apache/accumulo/server/tabletserver/compaction/DefaultCompactionStrategyTest.java
new file mode 100644
index 0000000..a636509
--- /dev/null
+++ b/server/src/test/java/org/apache/accumulo/server/tabletserver/compaction/DefaultCompactionStrategyTest.java
@@ -0,0 +1,263 @@
+package org.apache.accumulo.server.tabletserver.compaction;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.NoSuchMetaStoreException;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class DefaultCompactionStrategyTest {
+
+  private static Pair<Key,Key> keys(String firstString, String secondString) {
+    Key first = null;
+    if (firstString != null)
+      first = new Key(new Text(firstString));
+    Key second = null;
+    if (secondString != null)
+      second = new Key(new Text(secondString));
+    return new Pair<Key, Key>(first, second);
+  }
+  
+  static final Map<String, Pair<Key, Key>> fakeFiles = new HashMap<String, Pair<Key, Key>>();
+  static {
+    fakeFiles.put("file1", keys("b", "m"));
+    fakeFiles.put("file2", keys("n", "z"));
+    fakeFiles.put("file3", keys("a", "y"));
+    fakeFiles.put("file4", keys(null, null));
+  }
+  
+  // Mock FileSKVIterator, which will provide first/last keys above
+  private static class TestFileSKVIterator implements FileSKVIterator {
+    private String filename;
+
+    TestFileSKVIterator(String filename) {
+      this.filename = filename;
+    }
+    
+    @Override
+    public void setInterruptFlag(AtomicBoolean flag) {
+    }
+
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+    }
+
+    @Override
+    public boolean hasTop() {
+      return false;
+    }
+
+    @Override
+    public void next() throws IOException {
+    }
+
+    @Override
+    public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+    }
+
+    @Override
+    public Key getTopKey() {
+      return null;
+    }
+
+    @Override
+    public Value getTopValue() {
+      return null;
+    }
+
+    @Override
+    public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+      return null;
+    }
+
+    @Override
+    public Key getFirstKey() throws IOException {
+      Pair<Key,Key> pair = fakeFiles.get(filename);
+      if (pair == null)
+        return null;
+      return pair.getFirst();
+    }
+
+    @Override
+    public Key getLastKey() throws IOException {
+      Pair<Key,Key> pair = fakeFiles.get(filename);
+      if (pair == null)
+        return null;
+      return pair.getSecond();
+    }
+
+    @Override
+    public DataInputStream getMetaStore(String name) throws IOException, NoSuchMetaStoreException {
+      return null;
+    }
+
+    @Override
+    public void closeDeepCopies() throws IOException {
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+    
+  }
+
+  static final DefaultConfiguration dfault = AccumuloConfiguration.getDefaultConfiguration();
+  private static class TestCompactionRequest extends MajorCompactionRequest {
+    @Override
+    FileSKVIterator openReader(FileRef ref) throws IOException {
+      return new TestFileSKVIterator(ref.toString());
+    }
+
+    TestCompactionRequest(KeyExtent extent, 
+        MajorCompactionReason reason, 
+        Map<FileRef, DataFileValue> files) {
+        super(extent, reason, null, dfault);
+        setFiles(files);
+    }
+    
+  }
+  
+  
+  private MajorCompactionRequest createRequest(MajorCompactionReason reason, Object... objs) throws IOException {
+    return createRequest(new KeyExtent(new Text("0"), null, null), reason, objs);
+  }
+  
+  private MajorCompactionRequest createRequest(KeyExtent extent, MajorCompactionReason reason, Object... objs) throws IOException {
+    Map<FileRef, DataFileValue> files = new HashMap<FileRef, DataFileValue>();
+    for (int i = 0; i < objs.length; i += 2) {
+      files.put(new FileRef((String)objs[i]), new DataFileValue(((Number)objs[i+1]).longValue(), 0));
+    }
+    return new TestCompactionRequest(extent, reason, files);
+  }
+  
+  private static Set<String> asSet(String ... strings) {
+    return asSet(Arrays.asList(strings));
+  }
+  
+  private static Set<String> asStringSet(Collection<FileRef> refs) {
+    HashSet<String> result = new HashSet<String>();
+    for (FileRef ref : refs) {
+      result.add(ref.path().toString());
+    }
+    return result;
+  }
+  
+  private static Set<String> asSet(Collection<String> strings) {
+    HashSet<String> result = new HashSet<String>();
+    result.addAll(strings);
+    return result;
+  }
+  
+  @Test
+  public void testGetCompactionPlan() throws Exception {
+    DefaultCompactionStrategy s = new DefaultCompactionStrategy();
+    
+    // do nothing
+    MajorCompactionRequest request = createRequest(MajorCompactionReason.IDLE, "file1", 10, "file2", 10);
+    s.gatherInformation(request);
+    CompactionPlan plan = s.getCompactionPlan(request);
+    assertTrue(plan.passes.isEmpty());
+    
+    // do everything
+    request = createRequest(MajorCompactionReason.IDLE, "file1", 10, "file2", 10, "file3", 10);
+    s.gatherInformation(request);
+    plan = s.getCompactionPlan(request);
+    assertEquals(1, plan.passes.size());
+    CompactionPass pass = plan.passes.get(0);
+    assertEquals(3, pass.inputFiles.size());
+    assertEquals(1, pass.outputFiles);
+    assertFalse(plan.propogateDeletes);
+    
+    // do everything
+    request = createRequest(MajorCompactionReason.USER, "file1", 10, "file2", 10);
+    s.gatherInformation(request);
+    plan = s.getCompactionPlan(request);
+    assertEquals(1, plan.passes.size());
+    pass = plan.passes.get(0);
+    assertEquals(2, pass.inputFiles.size());
+    assertEquals(1, pass.outputFiles);
+    assertFalse(plan.propogateDeletes);
+    
+    // partial
+    request = createRequest(MajorCompactionReason.NORMAL, "file0", 100, "file1", 10, "file2", 10, "file3", 10);
+    s.gatherInformation(request);
+    plan = s.getCompactionPlan(request);
+    assertEquals(1, plan.passes.size());
+    pass = plan.passes.get(0);
+    assertEquals(3, pass.inputFiles.size());
+    assertEquals(1, pass.outputFiles);
+    assertEquals(asStringSet(pass.inputFiles), asSet("file1,file2,file3".split(",")));
+    assertTrue(plan.propogateDeletes);
+    
+    // chop tests
+    // everything overlaps default tablet
+    request = createRequest(MajorCompactionReason.NORMAL, "file1", 10, "file2", 10, "file3", 10);
+    s.gatherInformation(request);
+    plan = s.getCompactionPlan(request);
+    assertEquals(1, plan.passes.size());
+    pass = plan.passes.get(0);
+    assertEquals(3, pass.inputFiles.size());
+    assertEquals(1, pass.outputFiles);
+    assertEquals(asStringSet(pass.inputFiles), asSet("file1,file2,file3".split(",")));
+    assertFalse(plan.propogateDeletes);
+    
+    // Partial overlap
+    KeyExtent extent = new KeyExtent(new Text("0"), new Text("n"), new Text("a"));
+    request = createRequest(extent, MajorCompactionReason.CHOP, "file1", 10, "file2", 10, "file3", 10);
+    s.gatherInformation(request);
+    plan = s.getCompactionPlan(request);
+    assertEquals(1, plan.passes.size());
+    pass = plan.passes.get(0);
+    assertEquals(2, pass.inputFiles.size());
+    assertEquals(1, pass.outputFiles);
+    assertEquals(asStringSet(pass.inputFiles), asSet("file2,file3".split(",")));
+    assertTrue(plan.propogateDeletes);
+
+    // empty file
+    request = createRequest(extent, MajorCompactionReason.CHOP, "file1", 10, "file4", 10);
+    s.gatherInformation(request);
+    plan = s.getCompactionPlan(request);
+    assertEquals(1, plan.passes.size());
+    pass = plan.passes.get(0);
+    assertEquals(1, pass.inputFiles.size());
+    assertEquals(1, pass.outputFiles);
+    assertEquals(asStringSet(pass.inputFiles), asSet("file4".split(",")));
+    assertTrue(plan.propogateDeletes);
+    
+    // file without first/last keys
+    request = createRequest(extent, MajorCompactionReason.CHOP, "file1", 10, "file5", 10);
+    s.gatherInformation(request);
+    plan = s.getCompactionPlan(request);
+    assertEquals(1, plan.passes.size());
+    pass = plan.passes.get(0);
+    assertEquals(1, pass.inputFiles.size());
+    assertEquals(1, pass.outputFiles);
+    assertEquals(asStringSet(pass.inputFiles), asSet("file5".split(",")));
+    assertTrue(plan.propogateDeletes);
+  }
+}


Mime
View raw message