accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [1/2] git commit: ACCUMULO-1451 added strategy-specific configuration, documented the strategy, and added a specific call for shouldCompact
Date Tue, 22 Oct 2013 23:02:52 GMT
Updated Branches:
  refs/heads/master 896c49f52 -> 86669f733


ACCUMULO-1451 added strategy-specific configuration, documented the strategy, and added a
specific call for shouldCompact


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f57ca2b9
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f57ca2b9
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f57ca2b9

Branch: refs/heads/master
Commit: f57ca2b9e1c032bb266adf9db2487189f690d855
Parents: aceb5fb
Author: Eric Newton <eric.newton@gmail.com>
Authored: Tue Oct 22 19:03:03 2013 -0400
Committer: Eric Newton <eric.newton@gmail.com>
Committed: Tue Oct 22 19:03:03 2013 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/core/conf/Property.java | 14 ++++++
 .../accumulo/server/tabletserver/Tablet.java    |  1 +
 .../TabletServerResourceManager.java            |  6 +--
 .../compaction/CompactionStrategy.java          | 46 ++++++++++++++++----
 .../compaction/DefaultCompactionStrategy.java   |  8 +++-
 .../compaction/MajorCompactionRequest.java      |  4 +-
 .../DefaultCompactionStrategyTest.java          |  2 +-
 .../test/ConfigurableMajorCompactionIT.java     |  6 ++-
 8 files changed, 69 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/f57ca2b9/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 8542a38..c42395f 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -19,7 +19,10 @@ package org.apache.accumulo.core.conf;
 import java.io.File;
 import java.lang.annotation.Annotation;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
@@ -367,6 +370,8 @@ public enum Property {
       "The ScanInterpreter class to apply on scan arguments in the shell"),
   TABLE_CLASSPATH("table.classpath.context", "", PropertyType.STRING, "Per table classpath
context"),
   TABLE_COMPACTION_STRATEGY("table.majc.compaction.strategy", "org.apache.accumulo.server.tabletserver.compaction.DefaultCompactionStrategy",
PropertyType.CLASSNAME, "A customizable major compaction strategy."),
+  TABLE_COMPACTION_STRATEGY_PREFIX("table.majc.compaction.strategy.opts.", null, PropertyType.PREFIX,
+      "Properties in this category are used to configure the compaction strategy."),
 
   // VFS ClassLoader properties
   VFS_CLASSLOADER_SYSTEM_CLASSPATH_PROPERTY(AccumuloVFSClassLoader.VFS_CLASSLOADER_SYSTEM_CLASSPATH_PROPERTY,
"", PropertyType.STRING,
@@ -584,4 +589,13 @@ public enum Property {
     }
     return instance;
   }
+
+  public static Map<String,String> getCompactionStrategyOptions(AccumuloConfiguration
tableConf) {
+    Map<String,String> longNames = tableConf.getAllPropertiesWithPrefix(Property.TABLE_COMPACTION_STRATEGY_PREFIX);
+    Map<String,String> result = new HashMap<String, String>();
+    for (Entry<String,String> entry : longNames.entrySet()) {
+      result.put(entry.getKey().substring(0, Property.TABLE_COMPACTION_STRATEGY_PREFIX.getKey().length()),
entry.getValue());
+    }
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f57ca2b9/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
index fc54cab..e5ca4d6 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
@@ -3110,6 +3110,7 @@ public class Tablet {
     
     // acquire file info outside of tablet lock
     CompactionStrategy strategy  = Property.createInstanceFromPropertyName(acuTableConf,
Property.TABLE_COMPACTION_STRATEGY, CompactionStrategy.class, new DefaultCompactionStrategy());
+    strategy.init(Property.getCompactionStrategyOptions(acuTableConf));
     MajorCompactionRequest request = new MajorCompactionRequest(extent, reason, fs, acuTableConf);
     request.setFiles(datafileManager.getDatafileSizes());
     strategy.gatherInformation(request);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f57ca2b9/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
index 381280d..cf2e1a5 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
@@ -564,13 +564,11 @@ public class TabletServerResourceManager {
         }
       }
       CompactionStrategy strategy  = Property.createInstanceFromPropertyName(tableConf, Property.TABLE_COMPACTION_STRATEGY,
CompactionStrategy.class, new DefaultCompactionStrategy());
+      strategy.init(Property.getCompactionStrategyOptions(tableConf));
       MajorCompactionRequest request = new MajorCompactionRequest(tablet.getExtent(), reason,
TabletServerResourceManager.this.fs, tableConf);
       request.setFiles(tabletFiles);
       try {
-        CompactionPlan plan = strategy.getCompactionPlan(request);
-        if (plan == null || plan.inputFiles.isEmpty())
-          return false;
-        return true;
+        return strategy.shouldCompact(request);
       } catch (IOException ex) {
         return false;
       }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f57ca2b9/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionStrategy.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionStrategy.java
b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionStrategy.java
index 16e4db1..10f41cd 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionStrategy.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionStrategy.java
@@ -17,25 +17,55 @@
 package org.apache.accumulo.server.tabletserver.compaction;
 
 import java.io.IOException;
+import java.util.Map;
 
 /**
  * The interface for customizing major compactions.
+ * <p>
+ * The tablet server has one thread to ask many tablets if they should compact. When the
strategy returns true, then tablet is added to the queue of tablets
+ * waiting for a compaction thread. Once a thread is available, the {@link #gatherInformation(MajorCompactionRequest)}
method is called outside the tablets' lock. This gives the strategy the
+ * ability to read information that maybe expensive to fetch. Once the gatherInformation
returns, the tablet lock is grabbed and the compactionPlan computed.
+ * This should *not* do expensive operations, especially not I/O. Note that the number of
files may change between calls to {@link #gatherInformation(MajorCompactionRequest)} and
+ * {@link #getCompactionPlan(MajorCompactionRequest)}.
+ * <p>
+ * <b>Note:</b> the strategy object used for the {@link #shouldCompact(MajorCompactionRequest)}
call is going to be different from the one used in the compaction thread.
  */
 public abstract class CompactionStrategy {
   
   /**
+   * The settings for the compaction strategy pulled from zookeeper.  The <tt>table.compacations.major.strategy.opts</tt>
part of the setting will be removed.
+   * 
+   * @param options
+   */
+  public void init(Map<String,String> options) {}
+  
+  /**
+   * Determine if this tablet is eligible for a major compaction. It's ok if it later determines
(through {@link #gatherInformation(MajorCompactionRequest)} and
+   * {@link #getCompactionPlan(MajorCompactionRequest)}) that it does not need to. Any state
stored during shouldCompact will no longer exist when
+   * {@link #gatherInformation(MajorCompactionRequest)} and {@link #getCompactionPlan(MajorCompactionRequest)}
are called.
+   * 
+   * @param request
+   * @return
+   * @throws IOException
+   */
+  public abstract boolean shouldCompact(MajorCompactionRequest request) throws IOException;
+  
+  /**
    * Called prior to obtaining the tablet lock, useful for examining metadata or indexes.
-   * @param request basic details about the tablet
+   * State collected during this method will be available during the call the {@link #getCompactionPlan(MajorCompactionRequest)}.
+   * 
+   * @param request
+   *          basic details about the tablet
    * @throws IOException
    */
-  public void gatherInformation(MajorCompactionRequest request) throws IOException {
-    
-  }
+  public void gatherInformation(MajorCompactionRequest request) throws IOException {}
   
-  /** 
-   * Get the plan for compacting a tablets files.  Called while holding the tablet lock,
so it should not be doing any blocking.
-   * @param request basic details about the tablet
-   * @return the plan for a major compaction
+  /**
+   * Get the plan for compacting a tablets files. Called while holding the tablet lock, so
it should not be doing any blocking.
+   * 
+   * @param request
+   *          basic details about the tablet
+   * @return the plan for a major compaction, or null to cancel the compaction.
    * @throws IOException
    */
   abstract public CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws
IOException;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f57ca2b9/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/DefaultCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/DefaultCompactionStrategy.java
b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/DefaultCompactionStrategy.java
index 5aa0f98..7d5f65d 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/DefaultCompactionStrategy.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/DefaultCompactionStrategy.java
@@ -30,7 +30,11 @@ import org.apache.accumulo.server.fs.FileRef;
 
 public class DefaultCompactionStrategy extends CompactionStrategy {
   
-  
+  @Override
+  public boolean shouldCompact(MajorCompactionRequest request) throws IOException {
+    return getCompactionPlan(request) != null;
+  }
+ 
   @Override
   public CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException
{
     CompactionPlan result = new CompactionPlan();
@@ -137,4 +141,6 @@ public class DefaultCompactionStrategy extends CompactionStrategy {
     
     return files;
   }
+
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f57ca2b9/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/MajorCompactionRequest.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/MajorCompactionRequest.java
b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/MajorCompactionRequest.java
index aa3df9d..cde9f29 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/MajorCompactionRequest.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/MajorCompactionRequest.java
@@ -70,12 +70,12 @@ public class MajorCompactionRequest {
     this.files = Collections.unmodifiableMap(update);
   }
   
-  FileStatus[] listStatus(Path path) throws IOException {
+  public FileStatus[] listStatus(Path path) throws IOException {
     // @TODO verify the file isn't some random file in HDFS
     return volumeManager.listStatus(path);
   }
   
-  FileSKVIterator openReader(FileRef ref) throws IOException {
+  public FileSKVIterator openReader(FileRef ref) throws IOException {
     // @TODO verify the file isn't some random file in HDFS
     // @TODO ensure these files are always closed?
     FileOperations fileFactory = FileOperations.getInstance();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f57ca2b9/server/src/test/java/org/apache/accumulo/server/tabletserver/compaction/DefaultCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/accumulo/server/tabletserver/compaction/DefaultCompactionStrategyTest.java
b/server/src/test/java/org/apache/accumulo/server/tabletserver/compaction/DefaultCompactionStrategyTest.java
index a11bd66..04ca127 100644
--- a/server/src/test/java/org/apache/accumulo/server/tabletserver/compaction/DefaultCompactionStrategyTest.java
+++ b/server/src/test/java/org/apache/accumulo/server/tabletserver/compaction/DefaultCompactionStrategyTest.java
@@ -144,7 +144,7 @@ public class DefaultCompactionStrategyTest {
   static final DefaultConfiguration dfault = AccumuloConfiguration.getDefaultConfiguration();
   private static class TestCompactionRequest extends MajorCompactionRequest {
     @Override
-    FileSKVIterator openReader(FileRef ref) throws IOException {
+    public FileSKVIterator openReader(FileRef ref) throws IOException {
       return new TestFileSKVIterator(ref.toString());
     }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f57ca2b9/test/src/test/java/org/apache/accumulo/test/ConfigurableMajorCompactionIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/ConfigurableMajorCompactionIT.java
b/test/src/test/java/org/apache/accumulo/test/ConfigurableMajorCompactionIT.java
index 9bc31fa..e5b1610 100644
--- a/test/src/test/java/org/apache/accumulo/test/ConfigurableMajorCompactionIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/ConfigurableMajorCompactionIT.java
@@ -55,10 +55,12 @@ public class ConfigurableMajorCompactionIT extends ConfigurableMacIT {
 
   public static class TestCompactionStrategy extends CompactionStrategy {
    
+    public boolean shouldCompact(MajorCompactionRequest request) throws IOException {
+      return request.getFiles().size() == 5;
+    }
+    
     @Override
     public CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException
{
-      if (request.getFiles().size() != 5)
-        return null;
       CompactionPlan plan = new CompactionPlan();
       plan.inputFiles.addAll(request.getFiles().keySet());
       plan.writeParameters = new WriteParameters();


Mime
View raw message