accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [1/6] accumulo git commit: ACCUMULO-1798 Add ability to specify compaction strategy for user specificed compactions.
Date Fri, 05 Dec 2014 03:51:31 GMT
Repository: accumulo
Updated Branches:
  refs/heads/master 433b6df06 -> 2f788f482


http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
index b3037d3..db8bbfe 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
@@ -18,10 +18,6 @@ package org.apache.accumulo.master.tableOps;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -34,6 +30,7 @@ import org.apache.accumulo.core.client.IsolatedScanner;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.RowIterator;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.CompactionStrategyConfig;
 import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
@@ -55,10 +52,10 @@ import org.apache.accumulo.fate.zookeeper.IZooReaderWriter.Mutator;
 import org.apache.accumulo.master.Master;
 import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
 import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.tableOps.UserCompactionConfig;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
@@ -207,95 +204,17 @@ public class CompactRange extends MasterRepo {
   private final String tableId;
   private byte[] startRow;
   private byte[] endRow;
-  private byte[] iterators;
+  private byte[] config;
 
-  public static class CompactionIterators implements Writable {
-    byte[] startRow;
-    byte[] endRow;
-    List<IteratorSetting> iterators;
 
-    public CompactionIterators(byte[] startRow, byte[] endRow, List<IteratorSetting> iterators) {
-      this.startRow = startRow;
-      this.endRow = endRow;
-      this.iterators = iterators;
-    }
-
-    public CompactionIterators() {
-      startRow = null;
-      endRow = null;
-      iterators = Collections.emptyList();
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-      out.writeBoolean(startRow != null);
-      if (startRow != null) {
-        out.writeInt(startRow.length);
-        out.write(startRow);
-      }
-
-      out.writeBoolean(endRow != null);
-      if (endRow != null) {
-        out.writeInt(endRow.length);
-        out.write(endRow);
-      }
-
-      out.writeInt(iterators.size());
-      for (IteratorSetting is : iterators) {
-        is.write(out);
-      }
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      if (in.readBoolean()) {
-        startRow = new byte[in.readInt()];
-        in.readFully(startRow);
-      } else {
-        startRow = null;
-      }
-
-      if (in.readBoolean()) {
-        endRow = new byte[in.readInt()];
-        in.readFully(endRow);
-      } else {
-        endRow = null;
-      }
-
-      int num = in.readInt();
-      iterators = new ArrayList<IteratorSetting>(num);
-
-      for (int i = 0; i < num; i++) {
-        iterators.add(new IteratorSetting(in));
-      }
-    }
-
-    public Text getEndRow() {
-      if (endRow == null)
-        return null;
-      return new Text(endRow);
-    }
-
-    public Text getStartRow() {
-      if (startRow == null)
-        return null;
-      return new Text(startRow);
-    }
-
-    public List<IteratorSetting> getIterators() {
-      return iterators;
-    }
-  }
-
-  public CompactRange(String tableId, byte[] startRow, byte[] endRow, List<IteratorSetting> iterators) throws ThriftTableOperationException {
+  public CompactRange(String tableId, byte[] startRow, byte[] endRow, List<IteratorSetting> iterators, CompactionStrategyConfig compactionStrategy)
+      throws ThriftTableOperationException {
     this.tableId = tableId;
     this.startRow = startRow.length == 0 ? null : startRow;
     this.endRow = endRow.length == 0 ? null : endRow;
 
-    if (iterators.size() > 0) {
-      this.iterators = WritableUtils.toByteArray(new CompactionIterators(this.startRow, this.endRow, iterators));
-    } else {
-      iterators = null;
+    if (iterators.size() > 0 || compactionStrategy != null) {
+      this.config = WritableUtils.toByteArray(new UserCompactionConfig(this.startRow, this.endRow, iterators, compactionStrategy));
     }
 
     if (this.startRow != null && this.endRow != null && new Text(startRow).compareTo(new Text(endRow)) >= 0)
@@ -337,12 +256,12 @@ public class CompactRange extends MasterRepo {
 
           StringBuilder encodedIterators = new StringBuilder();
 
-          if (iterators != null) {
+          if (config != null) {
             Hex hex = new Hex();
             encodedIterators.append(",");
             encodedIterators.append(txidString);
             encodedIterators.append("=");
-            encodedIterators.append(new String(hex.encode(iterators), UTF_8));
+            encodedIterators.append(new String(hex.encode(config), UTF_8));
           }
 
           return (Long.toString(flushID) + encodedIterators).getBytes(UTF_8);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index d4447ab..f9f5b4c 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -166,6 +166,7 @@ import org.apache.accumulo.server.master.state.TabletLocationState;
 import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;
 import org.apache.accumulo.server.master.state.TabletStateStore;
 import org.apache.accumulo.server.master.state.ZooTabletStateStore;
+import org.apache.accumulo.server.master.tableOps.UserCompactionConfig;
 import org.apache.accumulo.server.problems.ProblemReport;
 import org.apache.accumulo.server.problems.ProblemReports;
 import org.apache.accumulo.server.replication.ZooKeeperInitialization;
@@ -1643,19 +1644,19 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
             tabletsToCompact.add(tablet);
       }
 
-      Long compactionId = null;
+      Pair<Long,UserCompactionConfig> compactionInfo = null;
 
       for (Tablet tablet : tabletsToCompact) {
         // all for the same table id, so only need to read
         // compaction id once
-        if (compactionId == null)
+        if (compactionInfo == null)
           try {
-            compactionId = tablet.getCompactionID().getFirst();
+            compactionInfo = tablet.getCompactionID();
           } catch (NoNodeException e) {
             log.info("Asked to compact table with no compaction id " + ke + " " + e.getMessage());
             return;
           }
-        tablet.compactAll(compactionId);
+        tablet.compactAll(compactionInfo.getFirst(), compactionInfo.getSecond());
       }
 
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 6c31fab..6b2eaf0 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -16,8 +16,6 @@
  */
 package org.apache.accumulo.tserver;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -62,6 +60,8 @@ import org.apache.accumulo.tserver.compaction.MajorCompactionRequest;
 import org.apache.accumulo.tserver.tablet.Tablet;
 import org.apache.log4j.Logger;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 /**
  * ResourceManager is responsible for managing the resources of all tablets within a tablet server.
  *
@@ -638,11 +638,13 @@ public class TabletServerResourceManager {
       request.setFiles(tabletFiles);
       try {
         return strategy.shouldCompact(request);
-      } catch (IOException ex) {
-        return false;
+      } catch (IOException e) {
+        throw new RuntimeException(e);
       }
     }
 
+
+
     // END methods that Tablets call to make decisions about major compaction
 
     // tablets call this method to run minor compactions,

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionPlan.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionPlan.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionPlan.java
index 6f69fb0..75c6bd8 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionPlan.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionPlan.java
@@ -22,9 +22,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.accumulo.server.fs.FileRef;
-
 import com.google.common.collect.Sets;
+import org.apache.accumulo.server.fs.FileRef;
 
 /**
  * A plan for a compaction: the input files, the files that are *not* inputs to a compaction that should simply be deleted, and the optional parameters used to

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java
index 7bc1a80..2d94884 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java
@@ -43,6 +43,12 @@ public abstract class CompactionStrategy {
    * {@link #getCompactionPlan(MajorCompactionRequest)}) that it does not need to. Any state stored during shouldCompact will no longer exist when
    * {@link #gatherInformation(MajorCompactionRequest)} and {@link #getCompactionPlan(MajorCompactionRequest)} are called.
    * 
+   * <P>
+   * Called while holding the tablet lock, so it should not be doing any blocking.
+   * 
+   * <P>
+   * Since no blocking should be done in this method, then its unexpected that this method will throw IOException. However since its in the API, it can not be
+   * easily removed.
    */
   public abstract boolean shouldCompact(MajorCompactionRequest request) throws IOException;
 
@@ -58,6 +64,10 @@ public abstract class CompactionStrategy {
   /**
    * Get the plan for compacting a tablets files. Called while holding the tablet lock, so it should not be doing any blocking.
    * 
+   * <P>
+   * Since no blocking should be done in this method, then its unexpected that this method will throw IOException. However since its in the API, it can not be
+   * easily removed.
+   * 
    * @param request
    *          basic details about the tablet
    * @return the plan for a major compaction, or null to cancel the compaction.

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategy.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategy.java
index 8b03d17..1f0dc3a 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategy.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategy.java
@@ -16,7 +16,6 @@
  */
 package org.apache.accumulo.tserver.compaction;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
@@ -31,13 +30,13 @@ import org.apache.accumulo.server.fs.FileRef;
 public class DefaultCompactionStrategy extends CompactionStrategy {
 
   @Override
-  public boolean shouldCompact(MajorCompactionRequest request) throws IOException {
+  public boolean shouldCompact(MajorCompactionRequest request) {
     CompactionPlan plan = getCompactionPlan(request);
     return plan != null && !plan.inputFiles.isEmpty();
   }
 
   @Override
-  public CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException {
+  public CompactionPlan getCompactionPlan(MajorCompactionRequest request) {
     CompactionPlan result = new CompactionPlan();
 
     List<FileRef> toCompact = findMapFilesToCompact(request);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/EverythingCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/EverythingCompactionStrategy.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/EverythingCompactionStrategy.java
new file mode 100644
index 0000000..9295c30
--- /dev/null
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/EverythingCompactionStrategy.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.tserver.compaction;
+
+import java.io.IOException;
+
+/**
+ * The default compaction strategy for user initiated compactions. This strategy will always select all files.
+ */
+
+public class EverythingCompactionStrategy extends CompactionStrategy {
+
+  @Override
+  public boolean shouldCompact(MajorCompactionRequest request) throws IOException {
+    return request.getFiles().size() > 0;
+  }
+
+  @Override
+  public CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException {
+    CompactionPlan plan = new CompactionPlan();
+    plan.inputFiles.addAll(request.getFiles().keySet());
+    return plan;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategy.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategy.java
index 478939a..6d4dc79 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategy.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategy.java
@@ -53,7 +53,7 @@ public class SizeLimitCompactionStrategy extends DefaultCompactionStrategy {
   }
 
   @Override
-  public boolean shouldCompact(MajorCompactionRequest request) throws IOException {
+  public boolean shouldCompact(MajorCompactionRequest request) {
     return super.shouldCompact(filterFiles(request));
   }
 
@@ -63,7 +63,7 @@ public class SizeLimitCompactionStrategy extends DefaultCompactionStrategy {
   }
 
   @Override
-  public CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException {
+  public CompactionPlan getCompactionPlan(MajorCompactionRequest request) {
     return super.getCompactionPlan(filterFiles(request));
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index bc55c4f..bc75062 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -45,6 +45,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Durability;
 import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.admin.CompactionStrategyConfig;
 import org.apache.accumulo.core.client.impl.DurabilityImpl;
 import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -96,10 +97,11 @@ import org.apache.accumulo.server.fs.VolumeManager.FileType;
 import org.apache.accumulo.server.fs.VolumeUtil;
 import org.apache.accumulo.server.fs.VolumeUtil.TabletFiles;
 import org.apache.accumulo.server.master.state.TServerInstance;
-import org.apache.accumulo.server.master.tableOps.CompactionIterators;
+import org.apache.accumulo.server.master.tableOps.UserCompactionConfig;
 import org.apache.accumulo.server.problems.ProblemReport;
 import org.apache.accumulo.server.problems.ProblemReports;
 import org.apache.accumulo.server.problems.ProblemType;
+import org.apache.accumulo.server.security.SystemCredentials;
 import org.apache.accumulo.server.tablets.TabletTime;
 import org.apache.accumulo.server.tablets.UniqueNameAllocator;
 import org.apache.accumulo.server.util.FileUtil;
@@ -1152,7 +1154,7 @@ public class Tablet implements TabletCommitter {
     }
   }
 
-  public Pair<Long,List<IteratorSetting>> getCompactionID() throws NoNodeException {
+  public Pair<Long,UserCompactionConfig> getCompactionID() throws NoNodeException {
     try {
       String zTablePath = Constants.ZROOT + "/" + tabletServer.getInstance().getInstanceID() + Constants.ZTABLES + "/" + extent.getTableId()
           + Constants.ZTABLE_COMPACT_ID;
@@ -1160,7 +1162,7 @@ public class Tablet implements TabletCommitter {
       String[] tokens = new String(ZooReaderWriter.getInstance().getData(zTablePath, null), UTF_8).split(",");
       long compactID = Long.parseLong(tokens[0]);
 
-      CompactionIterators iters = new CompactionIterators();
+      UserCompactionConfig compactionConfig = new UserCompactionConfig();
 
       if (tokens.length > 1) {
         Hex hex = new Hex();
@@ -1168,20 +1170,20 @@ public class Tablet implements TabletCommitter {
         DataInputStream dis = new DataInputStream(bais);
 
         try {
-          iters.readFields(dis);
+          compactionConfig.readFields(dis);
         } catch (IOException e) {
           throw new RuntimeException(e);
         }
 
-        KeyExtent ke = new KeyExtent(extent.getTableId(), iters.getEndRow(), iters.getStartRow());
+        KeyExtent ke = new KeyExtent(extent.getTableId(), compactionConfig.getEndRow(), compactionConfig.getStartRow());
 
         if (!ke.overlaps(extent)) {
           // only use iterators if compaction range overlaps
-          iters = new CompactionIterators();
+          compactionConfig = new UserCompactionConfig();
         }
       }
 
-      return new Pair<Long,List<IteratorSetting>>(compactID, iters.getIterators());
+      return new Pair<Long,UserCompactionConfig>(compactID, compactionConfig);
     } catch (InterruptedException e) {
       throw new RuntimeException(e);
     } catch (NumberFormatException nfe) {
@@ -1780,21 +1782,34 @@ public class Tablet implements TabletCommitter {
 
     long t1, t2, t3;
 
-    // acquire file info outside of tablet lock
-    CompactionStrategy strategy = Property.createTableInstanceFromPropertyName(tableConfiguration, Property.TABLE_COMPACTION_STRATEGY,
-        CompactionStrategy.class, new DefaultCompactionStrategy());
-    strategy.init(Property.getCompactionStrategyOptions(tableConfiguration));
-
+    Pair<Long,UserCompactionConfig> compactionId = null;
+    CompactionStrategy strategy = null;
     Map<FileRef,Pair<Key,Key>> firstAndLastKeys = null;
-    if (reason == MajorCompactionReason.CHOP) {
+
+    if(reason == MajorCompactionReason.USER){
+      try {
+        compactionId = getCompactionID();
+        strategy = createCompactionStrategy(compactionId.getSecond().getCompactionStrategy());
+      } catch (NoNodeException e) {
+        throw new RuntimeException(e);
+      }
+    } else if (reason == MajorCompactionReason.NORMAL || reason == MajorCompactionReason.IDLE) {
+      strategy = Property.createTableInstanceFromPropertyName(tableConfiguration, Property.TABLE_COMPACTION_STRATEGY, CompactionStrategy.class,
+          new DefaultCompactionStrategy());
+      strategy.init(Property.getCompactionStrategyOptions(tableConfiguration));
+    } else if (reason == MajorCompactionReason.CHOP) {
       firstAndLastKeys = getFirstAndLastKeys(getDatafileManager().getDatafileSizes());
-    } else if (reason != MajorCompactionReason.USER) {
+    } else {
+      throw new IllegalArgumentException("Unknown compaction reason " + reason);
+    }
+
+    if (strategy != null) {
       MajorCompactionRequest request = new MajorCompactionRequest(extent, reason, getTabletServer().getFileSystem(), tableConfiguration);
       request.setFiles(getDatafileManager().getDatafileSizes());
       strategy.gatherInformation(request);
     }
 
-    Map<FileRef,DataFileValue> filesToCompact;
+    Map<FileRef,DataFileValue> filesToCompact = null;
 
     int maxFilesToCompact = tableConfiguration.getCount(Property.TSERV_MAJC_THREAD_MAXOPEN);
 
@@ -1802,6 +1817,7 @@ public class Tablet implements TabletCommitter {
     CompactionPlan plan = null;
 
     boolean propogateDeletes = false;
+    boolean updateCompactionID = false;
 
     synchronized (this) {
       // plan all that work that needs to be done in the sync block... then do the actual work
@@ -1831,8 +1847,6 @@ public class Tablet implements TabletCommitter {
       if (reason == MajorCompactionReason.CHOP) {
         // enforce rules: files with keys outside our range need to be compacted
         inputFiles.addAll(findChopFiles(extent, firstAndLastKeys, allFiles.keySet()));
-      } else if (reason == MajorCompactionReason.USER) {
-        inputFiles.addAll(allFiles.keySet());
       } else {
         MajorCompactionRequest request = new MajorCompactionRequest(extent, reason, fs, tableConfiguration);
         request.setFiles(allFiles);
@@ -1844,32 +1858,48 @@ public class Tablet implements TabletCommitter {
       }
 
       if (inputFiles.isEmpty()) {
-        return majCStats;
+        if (reason == MajorCompactionReason.USER) {
+          // no work to do
+          lastCompactID = compactionId.getFirst();
+          updateCompactionID = true;
+        } else {
+          return majCStats;
+        }
+      } else {
+        // If no original files will exist at the end of the compaction, we do not have to propogate deletes
+        Set<FileRef> droppedFiles = new HashSet<FileRef>();
+        droppedFiles.addAll(inputFiles);
+        if (plan != null)
+          droppedFiles.addAll(plan.deleteFiles);
+        propogateDeletes = !(droppedFiles.equals(allFiles.keySet()));
+        log.debug("Major compaction plan: " + plan + " propogate deletes : " + propogateDeletes);
+        filesToCompact = new HashMap<FileRef,DataFileValue>(allFiles);
+        filesToCompact.keySet().retainAll(inputFiles);
+
+        getDatafileManager().reserveMajorCompactingFiles(filesToCompact.keySet());
       }
-      // If no original files will exist at the end of the compaction, we do not have to propogate deletes
-      Set<FileRef> droppedFiles = new HashSet<FileRef>();
-      droppedFiles.addAll(inputFiles);
-      if (plan != null)
-        droppedFiles.addAll(plan.deleteFiles);
-      propogateDeletes = !(droppedFiles.equals(allFiles.keySet()));
-      log.debug("Major compaction plan: " + plan + " propogate deletes : " + propogateDeletes);
-      filesToCompact = new HashMap<FileRef,DataFileValue>(allFiles);
-      filesToCompact.keySet().retainAll(inputFiles);
 
       t3 = System.currentTimeMillis();
-
-      getDatafileManager().reserveMajorCompactingFiles(filesToCompact.keySet());
     }
 
     try {
 
       log.debug(String.format("MajC initiate lock %.2f secs, wait %.2f secs", (t3 - t2) / 1000.0, (t2 - t1) / 1000.0));
 
-      Pair<Long,List<IteratorSetting>> compactionId = null;
-      if (!propogateDeletes) {
+      if (updateCompactionID) {
+        MetadataTableUtil.updateTabletCompactID(extent, compactionId.getFirst(),tabletServer, getTabletServer().getLock());
+        return majCStats;
+      }
+
+      if (!propogateDeletes && compactionId == null) {
         // compacting everything, so update the compaction id in metadata
         try {
           compactionId = getCompactionID();
+          if (compactionId.getSecond().getCompactionStrategy() != null) {
+            compactionId = null;
+            // TODO maybe return unless chop?
+          }
+
         } catch (NoNodeException e) {
           throw new RuntimeException(e);
         }
@@ -1890,7 +1920,7 @@ public class Tablet implements TabletCommitter {
           }
         }
 
-        compactionIterators = compactionId.getSecond();
+        compactionIterators = compactionId.getSecond().getIterators();
       }
 
       // need to handle case where only one file is being major compacted
@@ -2495,7 +2525,24 @@ public class Tablet implements TabletCommitter {
     initiateMajorCompaction(MajorCompactionReason.CHOP);
   }
 
-  public void compactAll(long compactionId) {
+  private CompactionStrategy createCompactionStrategy(CompactionStrategyConfig strategyConfig) {
+    String context = tableConfiguration.get(Property.TABLE_CLASSPATH);
+    String clazzName = strategyConfig.getClassName();
+    try {
+      Class<? extends CompactionStrategy> clazz;
+      if (context != null && !context.equals(""))
+        clazz = AccumuloVFSClassLoader.getContextManager().loadClass(context, clazzName, CompactionStrategy.class);
+      else
+        clazz = AccumuloVFSClassLoader.loadClass(clazzName, CompactionStrategy.class);
+      CompactionStrategy strategy = clazz.newInstance();
+      strategy.init(strategyConfig.getOptions());
+      return strategy;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void compactAll(long compactionId, UserCompactionConfig compactionConfig) {
     boolean updateMetadata = false;
 
     synchronized (this) {
@@ -2522,8 +2569,25 @@ public class Tablet implements TabletCommitter {
         majorCompactionState = CompactionState.IN_PROGRESS;
         updateMetadata = true;
         lastCompactID = compactionId;
-      } else
-        initiateMajorCompaction(MajorCompactionReason.USER);
+      } else {
+        CompactionStrategyConfig strategyConfig = compactionConfig.getCompactionStrategy();
+        CompactionStrategy strategy = createCompactionStrategy(strategyConfig);
+
+        MajorCompactionRequest request = new MajorCompactionRequest(extent, MajorCompactionReason.USER, getTabletServer().getFileSystem(), tableConfiguration);
+        request.setFiles(getDatafileManager().getDatafileSizes());
+
+        try {
+          if (strategy.shouldCompact(request)) {
+            initiateMajorCompaction(MajorCompactionReason.USER);
+          } else {
+            majorCompactionState = CompactionState.IN_PROGRESS;
+            updateMetadata = true;
+            lastCompactID = compactionId;
+          }
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
     }
 
     if (updateMetadata) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java
index 80dd9ba..660630e 100644
--- a/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java
+++ b/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java
@@ -17,28 +17,27 @@
 package org.apache.accumulo.shell.commands;
 
 import java.util.ArrayList;
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.CompactionConfig;
+import org.apache.accumulo.core.client.admin.CompactionStrategyConfig;
 import org.apache.accumulo.shell.Shell;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
-import org.apache.hadoop.io.Text;
 
 public class CompactCommand extends TableOperation {
-  private Option noFlushOption, waitOpt, profileOpt, cancelOpt;
-  private boolean flush;
-  private Text startRow;
-  private Text endRow;
-  private List<IteratorSetting> iterators;
+  private Option noFlushOption, waitOpt, profileOpt, cancelOpt, strategyOpt, strategyConfigOpt;
+
+  private CompactionConfig compactionConfig = null;
   
   boolean override = false;
-  private boolean wait;
   
   private boolean cancel = false;
 
@@ -59,13 +58,13 @@ public class CompactCommand extends TableOperation {
       }
     } else {
       try {
-        if (wait) {
+        if (compactionConfig.getWait()) {
           Shell.log.info("Compacting table ...");
         }
         
-        shellState.getConnector().tableOperations().compact(tableName, startRow, endRow, iterators, flush, wait);
+        shellState.getConnector().tableOperations().compact(tableName, compactionConfig);
         
-        Shell.log.info("Compaction of table " + tableName + " " + (wait ? "completed" : "started") + " for given range");
+        Shell.log.info("Compaction of table " + tableName + " " + (compactionConfig.getWait() ? "completed" : "started") + " for given range");
       } catch (Exception ex) {
         throw new AccumuloException(ex);
       }
@@ -85,10 +84,12 @@ public class CompactCommand extends TableOperation {
       cancel = false;
     }
 
-    flush = !cl.hasOption(noFlushOption.getOpt());
-    startRow = OptUtil.getStartRow(cl);
-    endRow = OptUtil.getEndRow(cl);
-    wait = cl.hasOption(waitOpt.getOpt());
+    compactionConfig = new CompactionConfig();
+
+    compactionConfig.setFlush(!cl.hasOption(noFlushOption.getOpt()));
+    compactionConfig.setWait(cl.hasOption(waitOpt.getOpt()));
+    compactionConfig.setStartRow(OptUtil.getStartRow(cl));
+    compactionConfig.setEndRow(OptUtil.getEndRow(cl));
     
     if (cl.hasOption(profileOpt.getOpt())) {
       List<IteratorSetting> iterators = shellState.iteratorProfiles.get(cl.getOptionValue(profileOpt.getOpt()));
@@ -97,11 +98,24 @@ public class CompactCommand extends TableOperation {
         return -1;
       }
       
-      this.iterators = new ArrayList<IteratorSetting>(iterators);
-    } else {
-      this.iterators = Collections.emptyList();
+      compactionConfig.setIterators(new ArrayList<>(iterators));
     }
 
+    if (cl.hasOption(strategyOpt.getOpt())) {
+      CompactionStrategyConfig csc = new CompactionStrategyConfig(cl.getOptionValue(strategyOpt.getOpt()));
+      if (cl.hasOption(strategyConfigOpt.getOpt())) {
+        Map<String,String> props = new HashMap<>();
+        String[] keyVals = cl.getOptionValue(strategyConfigOpt.getOpt()).split(",");
+        for (String keyVal : keyVals) {
+          String[] sa = keyVal.split("=");
+          props.put(sa[0], sa[1]);
+        }
+
+        csc.setOptions(props);
+      }
+
+      compactionConfig.setCompactionStrategy(csc);
+    }
 
     return super.execute(fullCommand, cl, shellState);
   }
@@ -121,6 +135,11 @@ public class CompactCommand extends TableOperation {
     profileOpt.setArgName("profile");
     opts.addOption(profileOpt);
 
+    strategyOpt = new Option("s", "strategy", true, "compaction strategy class name");
+    opts.addOption(strategyOpt);
+    strategyConfigOpt = new Option("sc", "strategyConfig", true, "Key value options for compaction strategy.  Expects <prop>=<value>{,<prop>=<value>}");
+    opts.addOption(strategyConfigOpt);
+
     cancelOpt = new Option(null, "cancel", false, "cancel user initiated compactions");
     opts.addOption(cancelOpt);
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/test/src/test/java/org/apache/accumulo/proxy/SimpleProxyIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/proxy/SimpleProxyIT.java b/test/src/test/java/org/apache/accumulo/proxy/SimpleProxyIT.java
index 50e53a9..c075075 100644
--- a/test/src/test/java/org/apache/accumulo/proxy/SimpleProxyIT.java
+++ b/test/src/test/java/org/apache/accumulo/proxy/SimpleProxyIT.java
@@ -69,6 +69,7 @@ import org.apache.accumulo.proxy.thrift.BatchScanOptions;
 import org.apache.accumulo.proxy.thrift.Column;
 import org.apache.accumulo.proxy.thrift.ColumnUpdate;
 import org.apache.accumulo.proxy.thrift.CompactionReason;
+import org.apache.accumulo.proxy.thrift.CompactionStrategyConfig;
 import org.apache.accumulo.proxy.thrift.CompactionType;
 import org.apache.accumulo.proxy.thrift.Condition;
 import org.apache.accumulo.proxy.thrift.ConditionalStatus;
@@ -245,7 +246,7 @@ public class SimpleProxyIT {
       fail("exception not thrown");
     } catch (TException ex) {}
     try {
-      client.compactTable(badLogin, table, null, null, null, true, false);
+      client.compactTable(badLogin, table, null, null, null, true, false, null);
       fail("exception not thrown");
     } catch (AccumuloSecurityException ex) {}
     try {
@@ -531,7 +532,7 @@ public class SimpleProxyIT {
       fail("exception not thrown");
     } catch (TableNotFoundException ex) {}
     try {
-      client.compactTable(creds, doesNotExist, null, null, null, true, false);
+      client.compactTable(creds, doesNotExist, null, null, null, true, false, null);
       fail("exception not thrown");
     } catch (TableNotFoundException ex) {}
     try {
@@ -874,7 +875,7 @@ public class SimpleProxyIT {
       public void run() {
         try {
           Client client2 = new TestProxyClient("localhost", proxyPort, protocolClass.newInstance()).proxy();
-          client2.compactTable(creds, "slow", null, null, null, true, true);
+          client2.compactTable(creds, "slow", null, null, null, true, true, null);
         } catch (Exception e) {
           throw new RuntimeException(e);
         }
@@ -1126,7 +1127,7 @@ public class SimpleProxyIT {
     client.clearLocatorCache(creds, TABLE_TEST);
 
     // compact
-    client.compactTable(creds, TABLE_TEST, null, null, null, true, true);
+    client.compactTable(creds, TABLE_TEST, null, null, null, true, true, null);
     assertEquals(1, countFiles(TABLE_TEST));
     assertScan(expected, TABLE_TEST);
 
@@ -1141,7 +1142,7 @@ public class SimpleProxyIT {
     assertEquals(2, diskUsage.size());
     assertEquals(1, diskUsage.get(0).getTables().size());
     assertEquals(2, diskUsage.get(1).getTables().size());
-    client.compactTable(creds, TABLE_TEST2, null, null, null, true, true);
+    client.compactTable(creds, TABLE_TEST2, null, null, null, true, true, null);
     diskUsage = (client.getDiskUsage(creds, tablesToScan));
     assertEquals(3, diskUsage.size());
     assertEquals(1, diskUsage.get(0).getTables().size());
@@ -1591,4 +1592,40 @@ public class SimpleProxyIT {
     assertEquals(range.start.timestamp, range.start.timestamp);
     assertEquals(range.stop.timestamp, range.stop.timestamp);
   }
+
+  @Test
+  public void testCompactionStrategy() throws Exception {
+    final String tableName = makeTableName();
+
+    client.createTable(creds, tableName, true, TimeType.MILLIS);
+
+    client.setProperty(creds, Property.VFS_CONTEXT_CLASSPATH_PROPERTY.getKey() + "context1",
+        System.getProperty("user.dir") + "/src/test/resources/TestCompactionStrat.jar");
+    client.setTableProperty(creds, tableName, Property.TABLE_CLASSPATH.getKey(), "context1");
+
+    client.addSplits(creds, tableName, Collections.singleton(s2bb("efg")));
+
+    client.updateAndFlush(creds, tableName, mutation("a", "cf", "cq", "v1"));
+    client.flushTable(creds, tableName, null, null, true);
+
+    client.updateAndFlush(creds, tableName, mutation("b", "cf", "cq", "v2"));
+    client.flushTable(creds, tableName, null, null, true);
+
+    client.updateAndFlush(creds, tableName, mutation("y", "cf", "cq", "v1"));
+    client.flushTable(creds, tableName, null, null, true);
+
+    client.updateAndFlush(creds, tableName, mutation("z", "cf", "cq", "v2"));
+    client.flushTable(creds, tableName, null, null, true);
+
+    assertEquals(4, countFiles(tableName));
+
+    CompactionStrategyConfig csc = new CompactionStrategyConfig();
+
+    // The EfgCompactionStrat will only compact tablets with and end row of efg
+    csc.setClassName("org.apache.accumulo.test.EfgCompactionStrat");
+
+    client.compactTable(creds, tableName, null, null, null, true, true, csc);
+
+    assertEquals(3, countFiles(tableName));
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
index 544bb76..d878c7f 100644
--- a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
@@ -54,8 +54,9 @@ import org.apache.accumulo.core.file.FileSKVWriter;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.shell.Shell;
 import org.apache.accumulo.harness.SharedMiniClusterIT;
+import org.apache.accumulo.shell.Shell;
+import org.apache.accumulo.test.UserCompactionStrategyIT.TestCompactionStrategy;
 import org.apache.accumulo.test.functional.FunctionalTestUtils;
 import org.apache.accumulo.test.functional.SlowIterator;
 import org.apache.accumulo.tracer.TraceServer;
@@ -725,7 +726,7 @@ public class ShellServerIT extends SharedMiniClusterIT {
     // make two more files:
     ts.exec("insert m 1 2 3");
     ts.exec("flush -w");
-    ts.exec("insert n 1 2 3");
+    ts.exec("insert n 1 2 v901");
     ts.exec("flush -w");
     List<String> oldFiles = getFiles(tableId);
 
@@ -740,6 +741,14 @@ public class ShellServerIT extends SharedMiniClusterIT {
     ts.exec("merge --all -t " + table);
     ts.exec("compact -w");
     assertEquals(1, countFiles(tableId));
+
+    // test compaction strategy
+    ts.exec("insert z 1 2 v900");
+    ts.exec("compact -w -s " + TestCompactionStrategy.class.getName() + " -sc inputPrefix=F,dropPrefix=A");
+    assertEquals(1, countFiles(tableId));
+    ts.exec("scan", true, "v900", true);
+    ts.exec("scan", true, "v901", false);
+
     ts.exec("deletetable -f " + table);
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java b/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java
new file mode 100644
index 0000000..5421f52
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/UserCompactionStrategyIT.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.CompactionConfig;
+import org.apache.accumulo.core.client.admin.CompactionStrategyConfig;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.RegExFilter;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterIT;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.test.functional.FunctionalTestUtils;
+import org.apache.accumulo.tserver.compaction.CompactionPlan;
+import org.apache.accumulo.tserver.compaction.CompactionStrategy;
+import org.apache.accumulo.tserver.compaction.MajorCompactionRequest;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+public class UserCompactionStrategyIT extends AccumuloClusterIT {
+
+  public static class SizeCompactionStrategy extends CompactionStrategy {
+
+    private long size = 0;
+
+    @Override
+    public void init(Map<String,String> options) {
+      size = Long.parseLong(options.get("size"));
+    }
+
+    @Override
+    public boolean shouldCompact(MajorCompactionRequest request) throws IOException {
+
+      for (DataFileValue dfv : request.getFiles().values())
+        if (dfv.getSize() < size)
+          return true;
+
+      return false;
+    }
+
+    @Override
+    public CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException {
+      CompactionPlan plan = new CompactionPlan();
+
+      for (Entry<FileRef,DataFileValue> entry : request.getFiles().entrySet())
+        if (entry.getValue().getSize() < size)
+          plan.inputFiles.add(entry.getKey());
+
+      return plan;
+    }
+
+  }
+
+  public static class TestCompactionStrategy extends CompactionStrategy {
+
+    private String inputPrefix = "Z";
+    private String dropPrefix = "Z";
+    private boolean shouldCompact = false;
+
+    @Override
+    public void init(Map<String,String> options) {
+      if (options.containsKey("inputPrefix"))
+        inputPrefix = options.get("inputPrefix");
+      if (options.containsKey("dropPrefix"))
+        dropPrefix = options.get("dropPrefix");
+      if (options.containsKey("shouldCompact"))
+        shouldCompact = Boolean.parseBoolean(options.get("shouldCompact"));
+    }
+
+    @Override
+    public boolean shouldCompact(MajorCompactionRequest request) throws IOException {
+      if (shouldCompact)
+        return true;
+
+      for (FileRef fref : request.getFiles().keySet()) {
+        if (fref.path().getName().startsWith(inputPrefix))
+          return true;
+        if (fref.path().getName().startsWith(dropPrefix))
+          return true;
+      }
+
+      return false;
+    }
+
+    @Override
+    public CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException {
+      CompactionPlan plan = new CompactionPlan();
+
+      for (FileRef fref : request.getFiles().keySet()) {
+        if (fref.path().getName().startsWith(dropPrefix)) {
+          plan.deleteFiles.add(fref);
+        } else if (fref.path().getName().startsWith(inputPrefix)) {
+          plan.inputFiles.add(fref);
+        }
+      }
+
+      return plan;
+    }
+  }
+
+  @Test
+  public void testDropA() throws Exception {
+    Connector c = getConnector();
+
+    String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+
+    writeFlush(c, tableName, "a");
+    writeFlush(c, tableName, "b");
+    // create a file that starts with A containing rows 'a' and 'b'
+    c.tableOperations().compact(tableName, new CompactionConfig().setWait(true));
+
+    writeFlush(c, tableName, "c");
+    writeFlush(c, tableName, "d");
+
+    // drop files that start with A
+    CompactionStrategyConfig csConfig = new CompactionStrategyConfig(TestCompactionStrategy.class.getName());
+    csConfig.setOptions(ImmutableMap.of("dropPrefix", "A", "inputPrefix", "F"));
+    c.tableOperations().compact(tableName, new CompactionConfig().setWait(true).setCompactionStrategy(csConfig));
+
+    Assert.assertEquals(ImmutableSet.of("c", "d"), getRows(c, tableName));
+
+    // this compaction should not drop files starting with A
+    c.tableOperations().compact(tableName, new CompactionConfig().setWait(true));
+    c.tableOperations().compact(tableName, new CompactionConfig().setWait(true));
+
+    Assert.assertEquals(ImmutableSet.of("c", "d"), getRows(c, tableName));
+  }
+
+  private void testDropNone(Map<String,String> options) throws Exception {
+
+    Connector c = getConnector();
+
+    String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+
+    writeFlush(c, tableName, "a");
+    writeFlush(c, tableName, "b");
+
+    CompactionStrategyConfig csConfig = new CompactionStrategyConfig(TestCompactionStrategy.class.getName());
+    csConfig.setOptions(options);
+    c.tableOperations().compact(tableName, new CompactionConfig().setWait(true).setCompactionStrategy(csConfig));
+
+    Assert.assertEquals(ImmutableSet.of("a", "b"), getRows(c, tableName));
+  }
+
+  @Test
+  public void testDropNone() throws Exception {
+    // test a compaction strategy that selects no files. In this case there is no work to do, want to ensure it does not hang.
+
+    testDropNone(ImmutableMap.of("inputPrefix", "Z"));
+  }
+
+  @Test
+  public void testDropNone2() throws Exception {
+    // test a compaction strategy that selects no files. This differs testDropNone() in that shouldCompact() will return true and getCompactionPlan() will
+    // return no work to do.
+
+    testDropNone(ImmutableMap.of("inputPrefix", "Z", "shouldCompact", "true"));
+  }
+
+  @Test
+  public void testPerTableClasspath() throws Exception {
+    // test pertable classpath + user specified compaction strat
+
+    final Connector c = getConnector();
+    final String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+    c.instanceOperations().setProperty(Property.VFS_CONTEXT_CLASSPATH_PROPERTY.getKey() + "context1",
+        System.getProperty("user.dir") + "/src/test/resources/TestCompactionStrat.jar");
+    c.tableOperations().setProperty(tableName, Property.TABLE_CLASSPATH.getKey(), "context1");
+
+    c.tableOperations().addSplits(tableName, new TreeSet<Text>(Arrays.asList(new Text("efg"))));
+
+    writeFlush(c, tableName, "a");
+    writeFlush(c, tableName, "b");
+
+    writeFlush(c, tableName, "h");
+    writeFlush(c, tableName, "i");
+
+    Assert.assertEquals(4, FunctionalTestUtils.countRFiles(c, tableName));
+
+    // EfgCompactionStrat will only compact a tablet w/ end row of 'efg'. No other tablets are compacted.
+    CompactionStrategyConfig csConfig = new CompactionStrategyConfig("org.apache.accumulo.test.EfgCompactionStrat");
+    c.tableOperations().compact(tableName, new CompactionConfig().setWait(true).setCompactionStrategy(csConfig));
+
+    Assert.assertEquals(3, FunctionalTestUtils.countRFiles(c, tableName));
+
+    c.tableOperations().compact(tableName, new CompactionConfig().setWait(true));
+
+    Assert.assertEquals(2, FunctionalTestUtils.countRFiles(c, tableName));
+  }
+
+  @Test
+  public void testIterators() throws Exception {
+    // test compaction strategy + iterators
+
+    Connector c = getConnector();
+
+    String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+
+    writeFlush(c, tableName, "a");
+    writeFlush(c, tableName, "b");
+    // create a file that starts with A containing rows 'a' and 'b'
+    c.tableOperations().compact(tableName, new CompactionConfig().setWait(true));
+
+    writeFlush(c, tableName, "c");
+    writeFlush(c, tableName, "d");
+
+    Assert.assertEquals(3, FunctionalTestUtils.countRFiles(c, tableName));
+
+    // drop files that start with A
+    CompactionStrategyConfig csConfig = new CompactionStrategyConfig(TestCompactionStrategy.class.getName());
+    csConfig.setOptions(ImmutableMap.of("inputPrefix", "F"));
+
+    IteratorSetting iterConf = new IteratorSetting(21, "myregex", RegExFilter.class);
+    RegExFilter.setRegexs(iterConf, "a|c", null, null, null, false);
+
+    c.tableOperations().compact(tableName, new CompactionConfig().setWait(true).setCompactionStrategy(csConfig).setIterators(Arrays.asList(iterConf)));
+
+    // compaction strategy should only be applied to one file. If its applied to both, then row 'b' would be dropped by filter.
+    Assert.assertEquals(ImmutableSet.of("a", "b", "c"), getRows(c, tableName));
+
+    Assert.assertEquals(2, FunctionalTestUtils.countRFiles(c, tableName));
+
+    c.tableOperations().compact(tableName, new CompactionConfig().setWait(true));
+
+    // ensure that iterator is not applied
+    Assert.assertEquals(ImmutableSet.of("a", "b", "c"), getRows(c, tableName));
+
+    Assert.assertEquals(1, FunctionalTestUtils.countRFiles(c, tableName));
+  }
+
+  @Test
+  public void testFileSize() throws Exception {
+    Connector c = getConnector();
+
+    String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+
+    // write random data because its very unlikely it will compress
+    writeRandomValue(c, tableName, 1 << 16);
+    writeRandomValue(c, tableName, 1 << 16);
+
+    writeRandomValue(c, tableName, 1 << 9);
+    writeRandomValue(c, tableName, 1 << 7);
+    writeRandomValue(c, tableName, 1 << 6);
+
+    Assert.assertEquals(5, FunctionalTestUtils.countRFiles(c, tableName));
+
+    CompactionStrategyConfig csConfig = new CompactionStrategyConfig(SizeCompactionStrategy.class.getName());
+    csConfig.setOptions(ImmutableMap.of("size", "" + (1 << 15)));
+    c.tableOperations().compact(tableName, new CompactionConfig().setWait(true).setCompactionStrategy(csConfig));
+
+    Assert.assertEquals(3, FunctionalTestUtils.countRFiles(c, tableName));
+
+    csConfig = new CompactionStrategyConfig(SizeCompactionStrategy.class.getName());
+    csConfig.setOptions(ImmutableMap.of("size", "" + (1 << 17)));
+    c.tableOperations().compact(tableName, new CompactionConfig().setWait(true).setCompactionStrategy(csConfig));
+
+    Assert.assertEquals(1, FunctionalTestUtils.countRFiles(c, tableName));
+
+  }
+
+  void writeRandomValue(Connector c, String tableName, int size) throws Exception {
+    Random rand = new Random();
+
+    byte data1[] = new byte[size];
+    rand.nextBytes(data1);
+
+    BatchWriter bw = c.createBatchWriter(tableName, new BatchWriterConfig());
+
+    Mutation m1 = new Mutation("r" + rand.nextInt(909090));
+    m1.put("data", "bl0b", new Value(data1));
+
+    bw.addMutation(m1);
+    bw.close();
+    c.tableOperations().flush(tableName, null, null, true);
+  }
+
+  private Set<String> getRows(Connector c, String tableName) throws TableNotFoundException {
+    Set<String> rows = new HashSet<String>();
+    Scanner scanner = c.createScanner(tableName, Authorizations.EMPTY);
+
+    for (Entry<Key,Value> entry : scanner)
+      rows.add(entry.getKey().getRowData().toString());
+    return rows;
+
+  }
+
+  private void writeFlush(Connector conn, String tablename, String row) throws Exception {
+    BatchWriter bw = conn.createBatchWriter(tablename, new BatchWriterConfig());
+    Mutation m = new Mutation(row);
+    m.put("", "", "");
+    bw.addMutation(m);
+    bw.close();
+    conn.tableOperations().flush(tablename, null, null, true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/test/src/test/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java b/test/src/test/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
index 4e0721b..e4e7229 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
@@ -16,8 +16,6 @@
  */
 package org.apache.accumulo.test.functional;
 
-import static org.junit.Assert.assertFalse;
-
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -49,8 +47,19 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 
+import static org.junit.Assert.assertFalse;
+
 public class FunctionalTestUtils {
 
+  public static int countRFiles(Connector c, String tableName) throws Exception {
+    Scanner scanner = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    String tableId = c.tableOperations().tableIdMap().get(tableName);
+    scanner.setRange(MetadataSchema.TabletsSection.getRange(tableId));
+    scanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+
+    return count(scanner);
+  }
+
   static void checkRFiles(Connector c, String tableName, int minTablets, int maxTablets, int minRFiles, int maxRFiles) throws Exception {
     Scanner scanner = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
     String tableId = c.tableOperations().tableIdMap().get(tableName);


Mime
View raw message