accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject svn commit: r1435371 [3/3] - in /accumulo/trunk: core/src/main/java/org/apache/accumulo/core/client/admin/ core/src/main/java/org/apache/accumulo/core/client/mock/ core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/ core/src/main/java/org/...
Date Fri, 18 Jan 2013 22:06:45 GMT
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java?rev=1435371&r1=1435370&r2=1435371&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java Fri Jan
18 22:06:44 2013
@@ -104,6 +104,7 @@ import org.apache.accumulo.core.util.she
 import org.apache.accumulo.core.util.shell.commands.InfoCommand;
 import org.apache.accumulo.core.util.shell.commands.InsertCommand;
 import org.apache.accumulo.core.util.shell.commands.InterpreterCommand;
+import org.apache.accumulo.core.util.shell.commands.ListCompactionsCommand;
 import org.apache.accumulo.core.util.shell.commands.ListIterCommand;
 import org.apache.accumulo.core.util.shell.commands.ListScansCommand;
 import org.apache.accumulo.core.util.shell.commands.MaxRowCommand;
@@ -293,7 +294,7 @@ public class Shell extends ShellOptions 
     
     Command[] dataCommands = {new DeleteCommand(), new DeleteManyCommand(), new DeleteRowsCommand(),
new EGrepCommand(), new FormatterCommand(),
         new InterpreterCommand(), new GrepCommand(), new ImportDirectoryCommand(), new InsertCommand(),
new MaxRowCommand(), new ScanCommand()};
-    Command[] debuggingCommands = {new ClasspathCommand(), new DebugCommand(), new ListScansCommand(),
new TraceCommand()};
+    Command[] debuggingCommands = {new ClasspathCommand(), new DebugCommand(), new ListScansCommand(),
new ListCompactionsCommand(), new TraceCommand()};
     Command[] execCommands = {new ExecfileCommand(), new HistoryCommand()};
     Command[] exitCommands = {new ByeCommand(), new ExitCommand(), new QuitCommand()};
     Command[] helpCommands = {new AboutCommand(), new HelpCommand(), new InfoCommand(), new
QuestionCommand()};

Added: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ActiveCompactionIterator.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ActiveCompactionIterator.java?rev=1435371&view=auto
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ActiveCompactionIterator.java
(added)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ActiveCompactionIterator.java
Fri Jan 18 22:06:44 2013
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util.shell.commands;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.admin.ActiveCompaction;
+import org.apache.accumulo.core.client.admin.InstanceOperations;
+import org.apache.accumulo.core.util.Duration;
+
+class ActiveCompactionIterator implements Iterator<String> {
+  
+  private InstanceOperations instanceOps;
+  private Iterator<String> tsIter;
+  private Iterator<String> compactionIter;
+  
+  private static String maxDecimal(double count) {
+    if (count < 9.995)
+      return String.format("%.2f", count);
+    if (count < 99.95)
+      return String.format("%.1f", count);
+    return String.format("%.0f", count);
+  }
+
+  private static String shortenCount(long count) {
+    if (count < 1000)
+      return count + "";
+    if (count < 1000000)
+      return maxDecimal(count / 1000.0) + "K";
+    if (count < 1000000000)
+      return maxDecimal(count / 1000000.0) + "M";
+    return maxDecimal(count / 1000000000.0) + "B";
+  }
+
+  private void readNext() {
+    final List<String> compactions = new ArrayList<String>();
+    
+    while (tsIter.hasNext()) {
+      
+      final String tserver = tsIter.next();
+      try {
+        List<ActiveCompaction> acl = instanceOps.getActiveCompactions(tserver);
+        
+        acl = new ArrayList<ActiveCompaction>(acl);
+        
+        Collections.sort(acl, new Comparator<ActiveCompaction>() {
+          @Override
+          public int compare(ActiveCompaction o1, ActiveCompaction o2) {
+            return (int) (o2.getAge() - o1.getAge());
+          }
+        });
+
+        for (ActiveCompaction ac : acl) {
+          String output = ac.getOutputFile();
+          int index = output.indexOf("tables");
+          if (index > 0) {
+            output = output.substring(index + 6);
+          }
+          
+          ac.getIterators();
+          
+          List<String> iterList = new ArrayList<String>();
+          Map<String,Map<String,String>> iterOpts = new HashMap<String,Map<String,String>>();
+          for (IteratorSetting is : ac.getIterators()) {
+            iterList.add(is.getName() + "=" + is.getPriority() + "," + is.getIteratorClass());
+            iterOpts.put(is.getName(), is.getOptions());
+          }
+
+          compactions.add(String.format("%21s | %9s | %5s | %6s | %5s | %5s | %15s | %-40s
| %5s | %35s | %9s | %s", tserver,
+              Duration.format(ac.getAge(), "", "-"), ac.getType(), ac.getReason(), shortenCount(ac.getEntriesRead()),
shortenCount(ac.getEntriesWritten()),
+              ac.getTable(), ac.getExtent(), ac.getInputFiles(),
+              output, iterList, iterOpts));
+        }
+      } catch (Exception e) {
+        compactions.add(tserver + " ERROR " + e.getMessage());
+      }
+      
+      if (compactions.size() > 0) {
+        break;
+      }
+    }
+    
+    compactionIter = compactions.iterator();
+  }
+  
+  ActiveCompactionIterator(List<String> tservers, InstanceOperations instanceOps) {
+    this.instanceOps = instanceOps;
+    this.tsIter = tservers.iterator();
+    
+    final String header = String.format(" %-21s| %-9s | %-5s | %-6s | %-5s | %-5s | %-15s
| %-40s | %-5s | %-35s | %-9s | %s", "TABLET SERVER", "AGE", "TYPE",
+        "REASON",
+ "READ", "WROTE", "TABLE", "TABLET", "INPUT", "OUTPUT", "ITERATORS", "ITERATOR OPTIONS");
+    
+    compactionIter = Collections.singletonList(header).iterator();
+  }
+  
+  @Override
+  public boolean hasNext() {
+    return compactionIter.hasNext();
+  }
+  
+  @Override
+  public String next() {
+    final String next = compactionIter.next();
+    
+    if (!compactionIter.hasNext())
+      readNext();
+    
+    return next;
+  }
+  
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException();
+  }
+  
+}

Added: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ListCompactionsCommand.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ListCompactionsCommand.java?rev=1435371&view=auto
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ListCompactionsCommand.java
(added)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ListCompactionsCommand.java
Fri Jan 18 22:06:44 2013
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util.shell.commands;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.accumulo.core.client.admin.InstanceOperations;
+import org.apache.accumulo.core.util.shell.Shell;
+import org.apache.accumulo.core.util.shell.Shell.Command;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+
+public class ListCompactionsCommand extends Command {
+  
+  private Option tserverOption, disablePaginationOpt;
+  
+  @Override
+  public String description() {
+    return "lists what compactions are currently running in accumulo. See the accumulo.core.client.admin.ActiveCompaciton
javadoc for more information about columns.";
+  }
+  
+  @Override
+  public int execute(final String fullCommand, final CommandLine cl, final Shell shellState)
throws Exception {
+    
+    List<String> tservers;
+    
+    final InstanceOperations instanceOps = shellState.getConnector().instanceOperations();
+    
+    final boolean paginate = !cl.hasOption(disablePaginationOpt.getOpt());
+    
+    if (cl.hasOption(tserverOption.getOpt())) {
+      tservers = new ArrayList<String>();
+      tservers.add(cl.getOptionValue(tserverOption.getOpt()));
+    } else {
+      tservers = instanceOps.getTabletServers();
+    }
+    
+    shellState.printLines(new ActiveCompactionIterator(tservers, instanceOps), paginate);
+    
+    return 0;
+  }
+  
+  @Override
+  public int numArgs() {
+    return 0;
+  }
+  
+  @Override
+  public Options getOptions() {
+    final Options opts = new Options();
+    
+    tserverOption = new Option("ts", "tabletServer", true, "tablet server to list compactions
for");
+    tserverOption.setArgName("tablet server");
+    opts.addOption(tserverOption);
+    
+    disablePaginationOpt = new Option("np", "no-pagination", false, "disable pagination of
output");
+    opts.addOption(disablePaginationOpt);
+    
+    return opts;
+  }
+  
+}

Modified: accumulo/trunk/core/src/main/thrift/tabletserver.thrift
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/thrift/tabletserver.thrift?rev=1435371&r1=1435370&r2=1435371&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/thrift/tabletserver.thrift (original)
+++ accumulo/trunk/core/src/main/thrift/tabletserver.thrift Fri Jan 18 22:06:44 2013
@@ -87,6 +87,35 @@ struct ActiveScan {
     13:list<binary> authorizations
 }
 
+enum CompactionType {
+   MINOR,
+   MERGE,
+   MAJOR,
+   FULL
+}
+
+enum CompactionReason {
+   USER,
+   SYSTEM,
+   CHOP,
+   IDLE,
+   CLOSE
+}
+
+struct ActiveCompaction {
+    1:data.TKeyExtent extent
+    2:i64 age
+    3:i32 inputFiles
+    4:string outputFile
+    5:CompactionType type
+    6:CompactionReason reason
+    7:string localityGroup
+    8:i64 entriesRead
+    9:i64 entriesWritten
+    10:list<data.IterInfo> ssiList
+    11:map<string, map<string, string>> ssio 
+}
+
 struct TIteratorSetting {
     1:i32 priority;
     2:string name;
@@ -157,6 +186,7 @@ service TabletClientService extends clie
   oneway void fastHalt(3:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials, 2:string
lock);
   
   list<ActiveScan> getActiveScans(2:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials)
throws (1:security.ThriftSecurityException sec)
+  list<ActiveCompaction> getActiveCompactions(2:cloudtrace.TInfo tinfo, 1:security.AuthInfo
credentials) throws (1:security.ThriftSecurityException sec)
   oneway void removeLogs(1:cloudtrace.TInfo tinfo, 2:security.AuthInfo credentials, 3:list<string>
filenames)
 }
 

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java?rev=1435371&r1=1435370&r2=1435371&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
(original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
Fri Jan 18 22:06:44 2013
@@ -18,6 +18,8 @@ package org.apache.accumulo.server.table
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -32,6 +34,7 @@ import org.apache.accumulo.core.data.Byt
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.thrift.IterInfo;
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVIterator;
 import org.apache.accumulo.core.file.FileSKVWriter;
@@ -43,6 +46,9 @@ import org.apache.accumulo.core.iterator
 import org.apache.accumulo.core.iterators.system.DeletingIterator;
 import org.apache.accumulo.core.iterators.system.MultiIterator;
 import org.apache.accumulo.core.iterators.system.TimeSettingIterator;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
+import org.apache.accumulo.core.tabletserver.thrift.CompactionReason;
+import org.apache.accumulo.core.tabletserver.thrift.CompactionType;
 import org.apache.accumulo.core.util.LocalityGroupUtil;
 import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
 import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
@@ -51,6 +57,8 @@ import org.apache.accumulo.server.proble
 import org.apache.accumulo.server.problems.ProblemReportingIterator;
 import org.apache.accumulo.server.problems.ProblemReports;
 import org.apache.accumulo.server.problems.ProblemType;
+import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason;
+import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -82,8 +90,133 @@ public class Compactor implements Callab
   protected KeyExtent extent;
   private List<IteratorSetting> iterators;
   
+  // things to report
+  private String currentLocalityGroup = "";
+  private long startTime;
+  private long currentEntriesRead = 0;
+  private long currentEntriesWritten = 0;
+  private long totalEntriesRead = 0;
+  private long totalEntriesWritten = 0;
+  private MajorCompactionReason reason;
+  protected MinorCompactionReason mincReason;
+  
+  private synchronized void updateStats(long read, long written) {
+    this.currentEntriesRead = read;
+    this.currentEntriesWritten = written;
+  }
+
+  private synchronized void clearStats() {
+    totalEntriesRead = 0;
+    totalEntriesWritten = 0;
+    currentEntriesRead = 0;
+    currentEntriesWritten = 0;
+    currentLocalityGroup = "";
+  }
+
+  private synchronized void rollStats() {
+    this.totalEntriesRead = currentEntriesRead;
+    this.totalEntriesWritten = currentEntriesWritten;
+    currentEntriesRead = 0;
+    currentEntriesWritten = 0;
+  }
+  
+  private synchronized void setLocalityGroup(String name) {
+    this.currentLocalityGroup = name;
+  }
+
+  protected static Set<Compactor> runningCompactions = Collections.synchronizedSet(new
HashSet<Compactor>());
+  
+  public static class CompactionInfo {
+    
+    private Compactor compactor;
+    private String localityGroup;
+    private long entriesRead;
+    private long entriesWritten;
+    
+    CompactionInfo(Compactor compactor) {
+      // get a consistent snapshot of changing stats
+      synchronized (compactor) {
+        this.localityGroup = compactor.currentLocalityGroup;
+        this.entriesRead = compactor.totalEntriesRead + compactor.currentEntriesRead;
+        this.entriesWritten = compactor.totalEntriesWritten + compactor.currentEntriesWritten;
+      }
+      
+      this.compactor = compactor;
+    }
+
+    public ActiveCompaction toThrift() {
+      
+      CompactionType type;
+      
+      if (compactor.imm != null)
+        if (compactor.filesToCompact.size() > 0)
+          type = CompactionType.MERGE;
+        else
+          type = CompactionType.MINOR;
+      else if (!compactor.propogateDeletes)
+        type = CompactionType.FULL;
+      else
+        type = CompactionType.MAJOR;
+      
+      CompactionReason reason;
+      
+      if (compactor.imm != null)
+        switch(compactor.mincReason){
+          case USER:
+            reason = CompactionReason.USER;
+            break;
+          case CLOSE:
+            reason = CompactionReason.CLOSE;
+            break;
+          case SYSTEM:
+          default:
+            reason = CompactionReason.SYSTEM;
+            break;
+        }
+      else
+        switch (compactor.reason) {
+          case USER:
+            reason = CompactionReason.USER;
+            break;
+          case CHOP:
+            reason = CompactionReason.CHOP;
+            break;
+          case IDLE:
+            reason = CompactionReason.IDLE;
+            break;
+          case NORMAL:
+          default:
+            reason = CompactionReason.SYSTEM;
+            break;
+        }
+      
+      List<IterInfo> iiList = new ArrayList<IterInfo>();
+      Map<String,Map<String,String>> iterOptions = new HashMap<String,Map<String,String>>();
+      
+      for (IteratorSetting iterSetting : compactor.iterators) {
+        iiList.add(new IterInfo(iterSetting.getPriority(), iterSetting.getIteratorClass(),
iterSetting.getName()));
+        iterOptions.put(iterSetting.getName(), iterSetting.getOptions());
+      }
+      
+      return new ActiveCompaction(compactor.extent.toThrift(), System.currentTimeMillis()
- compactor.startTime, compactor.filesToCompact.size(),
+          compactor.outputFile, type, reason, localityGroup, entriesRead, entriesWritten,
iiList, iterOptions);
+    }
+  }
+  
+  public static List<CompactionInfo> getRunningCompactions() {
+    ArrayList<CompactionInfo> compactions = new ArrayList<Compactor.CompactionInfo>();
+    
+    synchronized (runningCompactions) {
+      for (Compactor compactor : runningCompactions) {
+        compactions.add(new CompactionInfo(compactor));
+      }
+    }
+    
+    return compactions;
+  }
+
   Compactor(Configuration conf, FileSystem fs, Map<String,DataFileValue> files, InMemoryMap
imm, String outputFile, boolean propogateDeletes,
-      TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env, List<IteratorSetting>
iterators) {
+      TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env, List<IteratorSetting>
iterators, MajorCompactionReason reason) {
     this.extent = extent;
     this.conf = conf;
     this.fs = fs;
@@ -94,11 +227,14 @@ public class Compactor implements Callab
     this.acuTableConf = acuTableConf;
     this.env = env;
     this.iterators = iterators;
+    this.reason = reason;
+    
+    startTime = System.currentTimeMillis();
   }
   
   Compactor(Configuration conf, FileSystem fs, Map<String,DataFileValue> files, InMemoryMap
imm, String outputFile, boolean propogateDeletes,
       TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env) {
-    this(conf, fs, files, imm, outputFile, propogateDeletes, acuTableConf, extent, env, new
ArrayList<IteratorSetting>());
+    this(conf, fs, files, imm, outputFile, propogateDeletes, acuTableConf, extent, env, new
ArrayList<IteratorSetting>(), null);
   }
   
   public FileSystem getFileSystem() {
@@ -119,7 +255,11 @@ public class Compactor implements Callab
     FileSKVWriter mfw = null;
     
     CompactionStats majCStats = new CompactionStats();
+
+    boolean remove = runningCompactions.add(this);
     
+    clearStats();
+
     try {
       FileOperations fileFactory = FileOperations.getInstance();
       mfw = fileFactory.openWriter(outputFile, fs, conf, acuTableConf);
@@ -137,11 +277,13 @@ public class Compactor implements Callab
       
       if (mfw.supportsLocalityGroups()) {
         for (Entry<String,Set<ByteSequence>> entry : lGroups.entrySet()) {
+          setLocalityGroup(entry.getKey());
           compactLocalityGroup(entry.getKey(), entry.getValue(), true, mfw, majCStats);
           allColumnFamilies.addAll(entry.getValue());
         }
       }
       
+      setLocalityGroup("");
       compactLocalityGroup(null, allColumnFamilies, false, mfw, majCStats);
       
       long t2 = System.currentTimeMillis();
@@ -171,6 +313,10 @@ public class Compactor implements Callab
       log.error(e, e);
       throw e;
     } finally {
+      
+      if (remove)
+        runningCompactions.remove(this);
+
       try {
         if (mfw != null) {
           // compaction must not have finished successfully, so close its output file
@@ -281,8 +427,15 @@ public class Compactor implements Callab
           mfw.append(itr.getTopKey(), itr.getTopValue());
           itr.next();
           entriesCompacted++;
+          
+          if (entriesCompacted % 1024 == 0) {
+            // Periodically update stats, do not want to do this too often since its syncronized
+            updateStats(citr.getCount(), entriesCompacted);
+          }
         }
         
+        rollStats();
+
         if (itr.hasTop() && !env.isCompactionEnabled()) {
           // cancel major compaction operation
           try {

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java?rev=1435371&r1=1435370&r2=1435371&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java
(original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java
Fri Jan 18 22:06:44 2013
@@ -33,6 +33,7 @@ import org.apache.accumulo.server.conf.T
 import org.apache.accumulo.server.problems.ProblemReport;
 import org.apache.accumulo.server.problems.ProblemReports;
 import org.apache.accumulo.server.problems.ProblemType;
+import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -52,7 +53,7 @@ public class MinorCompactor extends Comp
   }
   
   MinorCompactor(Configuration conf, FileSystem fs, InMemoryMap imm, String mergeFile, DataFileValue
dfv, String outputFile, TableConfiguration acuTableConf,
-      KeyExtent extent) {
+      KeyExtent extent, MinorCompactionReason mincReason) {
     super(conf, fs, toFileMap(mergeFile, dfv), imm, outputFile, true, acuTableConf, extent,
new CompactionEnv() {
       
       @Override
@@ -65,6 +66,8 @@ public class MinorCompactor extends Comp
         return IteratorScope.minc;
       }
     });
+    
+    super.mincReason = mincReason;
   }
   
   private boolean isTableDeleting() {
@@ -86,52 +89,57 @@ public class MinorCompactor extends Comp
     int maxSleepTime = 1000 * Constants.DEFAULT_MINOR_COMPACTION_MAX_SLEEP_TIME;
     boolean reportedProblem = false;
     
-    do {
-      try {
-        CompactionStats ret = super.call();
+    runningCompactions.add(this);
+    try {
+      do {
+        try {
+          CompactionStats ret = super.call();
+          
+          // log.debug(String.format("MinC %,d recs in | %,d recs out | %,d recs/sec | %6.3f
secs | %,d bytes ",map.size(), entriesCompacted,
+          // (int)(map.size()/((t2 - t1)/1000.0)), (t2 - t1)/1000.0, estimatedSizeInBytes()));
+          
+          if (reportedProblem) {
+            ProblemReports.getInstance().deleteProblemReport(getExtent().getTableId().toString(),
ProblemType.FILE_WRITE, getOutputFile());
+          }
+          
+          return ret;
+        } catch (IOException e) {
+          log.warn("MinC failed (" + e.getMessage() + ") to create " + getOutputFile() +
" retrying ...");
+          ProblemReports.getInstance().report(new ProblemReport(getExtent().getTableId().toString(),
ProblemType.FILE_WRITE, getOutputFile(), e));
+          reportedProblem = true;
+        } catch (RuntimeException e) {
+          // if this is coming from a user iterator, it is possible that the user could change
the iterator config and that the
+          // minor compaction would succeed
+          log.warn("MinC failed (" + e.getMessage() + ") to create " + getOutputFile() +
" retrying ...", e);
+          ProblemReports.getInstance().report(new ProblemReport(getExtent().getTableId().toString(),
ProblemType.FILE_WRITE, getOutputFile(), e));
+          reportedProblem = true;
+        } catch (CompactionCanceledException e) {
+          throw new IllegalStateException(e);
+        }
         
-        // log.debug(String.format("MinC %,d recs in | %,d recs out | %,d recs/sec | %6.3f
secs | %,d bytes ",map.size(), entriesCompacted,
-        // (int)(map.size()/((t2 - t1)/1000.0)), (t2 - t1)/1000.0, estimatedSizeInBytes()));
+        Random random = new Random();
         
-        if (reportedProblem) {
-          ProblemReports.getInstance().deleteProblemReport(getExtent().getTableId().toString(),
ProblemType.FILE_WRITE, getOutputFile());
-        }
+        int sleep = sleepTime + random.nextInt(sleepTime);
+        log.debug("MinC failed sleeping " + sleep + " ms before retrying");
+        UtilWaitThread.sleep(sleep);
+        sleepTime = (int) Math.round(Math.min(maxSleepTime, sleepTime * growthFactor));
         
-        return ret;
-      } catch (IOException e) {
-        log.warn("MinC failed (" + e.getMessage() + ") to create " + getOutputFile() + "
retrying ...");
-        ProblemReports.getInstance().report(new ProblemReport(getExtent().getTableId().toString(),
ProblemType.FILE_WRITE, getOutputFile(), e));
-        reportedProblem = true;
-      } catch (RuntimeException e) {
-        // if this is coming from a user iterator, it is possible that the user could change
the iterator config and that the
-        // minor compaction would succeed
-        log.warn("MinC failed (" + e.getMessage() + ") to create " + getOutputFile() + "
retrying ...", e);
-        ProblemReports.getInstance().report(new ProblemReport(getExtent().getTableId().toString(),
ProblemType.FILE_WRITE, getOutputFile(), e));
-        reportedProblem = true;
-      } catch (CompactionCanceledException e) {
-        throw new IllegalStateException(e);
-      }
-      
-      Random random = new Random();
-      
-      int sleep = sleepTime + random.nextInt(sleepTime);
-      log.debug("MinC failed sleeping " + sleep + " ms before retrying");
-      UtilWaitThread.sleep(sleep);
-      sleepTime = (int) Math.round(Math.min(maxSleepTime, sleepTime * growthFactor));
-      
-      // clean up
-      try {
-        if (getFileSystem().exists(new Path(getOutputFile()))) {
-          getFileSystem().delete(new Path(getOutputFile()), true);
+        // clean up
+        try {
+          if (getFileSystem().exists(new Path(getOutputFile()))) {
+            getFileSystem().delete(new Path(getOutputFile()), true);
+          }
+        } catch (IOException e) {
+          log.warn("Failed to delete failed MinC file " + getOutputFile() + " " + e.getMessage());
         }
-      } catch (IOException e) {
-        log.warn("Failed to delete failed MinC file " + getOutputFile() + " " + e.getMessage());
-      }
-      
-      if (isTableDeleting())
-        return new CompactionStats(0, 0);
-
-    } while (true);
+        
+        if (isTableDeleting())
+          return new CompactionStats(0, 0);
+        
+      } while (true);
+    } finally {
+      runningCompactions.remove(this);
+    }
   }
   
 }

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java?rev=1435371&r1=1435370&r2=1435371&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
(original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
Fri Jan 18 22:06:44 2013
@@ -149,12 +149,16 @@ public class Tablet {
   enum MajorCompactionReason {
     // do not change the order, the order of this enum determines the order
     // in which queued major compactions are executed
-    ALL,
+    USER,
     CHOP,
     NORMAL,
     IDLE
   }
   
+  enum MinorCompactionReason {
+    USER, SYSTEM, CLOSE
+  }
+
   public class CommitSession {
     
     private int seq;
@@ -2123,7 +2127,7 @@ public class Tablet {
   }
   
   private DataFileValue minorCompact(Configuration conf, FileSystem fs, InMemoryMap memTable,
String tmpDatafile, String newDatafile, String mergeFile,
-      boolean hasQueueTime, long queued, CommitSession commitSession, long flushId) {
+      boolean hasQueueTime, long queued, CommitSession commitSession, long flushId, MinorCompactionReason
mincReason) {
     boolean failed = false;
     long start = System.currentTimeMillis();
     timer.incrementStatusMinor();
@@ -2138,7 +2142,7 @@ public class Tablet {
       if (mergeFile != null)
         dfv = datafileManager.getDatafileSizes().get(mergeFile);
       
-      MinorCompactor compactor = new MinorCompactor(conf, fs, memTable, mergeFile, dfv, tmpDatafile,
acuTableConf, extent);
+      MinorCompactor compactor = new MinorCompactor(conf, fs, memTable, mergeFile, dfv, tmpDatafile,
acuTableConf, extent, mincReason);
       CompactionStats stats = compactor.call();
       
       span.stop();
@@ -2182,13 +2186,15 @@ public class Tablet {
     private DataFileValue stats;
     private String mergeFile;
     private long flushId;
+    private MinorCompactionReason mincReason;
     
-    MinorCompactionTask(String mergeFile, CommitSession commitSession, long flushId) {
+    MinorCompactionTask(String mergeFile, CommitSession commitSession, long flushId, MinorCompactionReason
mincReason) {
       queued = System.currentTimeMillis();
       minorCompactionWaitingToStart = true;
       this.commitSession = commitSession;
       this.mergeFile = mergeFile;
       this.flushId = flushId;
+      this.mincReason = mincReason;
     }
     
     public void run() {
@@ -2219,7 +2225,7 @@ public class Tablet {
         span.stop();
         span = Trace.start("compact");
         this.stats = minorCompact(conf, fs, tabletMemory.getMinCMemTable(), newMapfileLocation
+ "_tmp", newMapfileLocation, mergeFile, true, queued,
-            commitSession, flushId);
+            commitSession, flushId, mincReason);
         span.stop();
         
         if (needsSplit()) {
@@ -2240,14 +2246,14 @@ public class Tablet {
     }
   }
   
-  private synchronized MinorCompactionTask prepareForMinC(long flushId) {
+  private synchronized MinorCompactionTask prepareForMinC(long flushId, MinorCompactionReason
mincReason) {
     CommitSession oldCommitSession = tabletMemory.prepareForMinC();
     otherLogs = currentLogs;
     currentLogs = new HashSet<DfsLogger>();
     
     String mergeFile = datafileManager.reserveMergingMinorCompactionFile();
     
-    return new MinorCompactionTask(mergeFile, oldCommitSession, flushId);
+    return new MinorCompactionTask(mergeFile, oldCommitSession, flushId, mincReason);
     
   }
   
@@ -2283,7 +2289,7 @@ public class Tablet {
         // a race condition
         MetadataTable.updateTabletFlushID(extent, tableFlushID, creds, tabletServer.getLock());
       } else if (initiateMinor)
-        initiateMinorCompaction(tableFlushID);
+        initiateMinorCompaction(tableFlushID, MinorCompactionReason.USER);
       
     } finally {
       if (updateMetadata) {
@@ -2296,7 +2302,7 @@ public class Tablet {
     
   }
   
-  boolean initiateMinorCompaction() {
+  boolean initiateMinorCompaction(MinorCompactionReason mincReason) {
     if (isClosed()) {
       // don't bother trying to get flush id if closed... could be closed after this check
but that is ok... just trying to cut down on uneeded log messages....
       return false;
@@ -2310,10 +2316,10 @@ public class Tablet {
       log.info("Asked to initiate MinC when there was no flush id " + getExtent() + " " +
e.getMessage());
       return false;
     }
-    return initiateMinorCompaction(flushId);
+    return initiateMinorCompaction(flushId, mincReason);
   }
   
-  boolean minorCompactNow() {
+  boolean minorCompactNow(MinorCompactionReason mincReason) {
     long flushId;
     try {
       flushId = getFlushID();
@@ -2321,22 +2327,22 @@ public class Tablet {
       log.info("Asked to initiate MinC when there was no flush id " + getExtent() + " " +
e.getMessage());
       return false;
     }
-    MinorCompactionTask mct = createMinorCompactionTask(flushId);
+    MinorCompactionTask mct = createMinorCompactionTask(flushId, mincReason);
     if (mct == null)
       return false;
     mct.run();
     return true;
   }
 
-  boolean initiateMinorCompaction(long flushId) {
-    MinorCompactionTask mct = createMinorCompactionTask(flushId);
+  boolean initiateMinorCompaction(long flushId, MinorCompactionReason mincReason) {
+    MinorCompactionTask mct = createMinorCompactionTask(flushId, mincReason);
     if (mct == null)
       return false;
     tabletResources.executeMinorCompaction(mct);
     return true;
   }
   
-  private MinorCompactionTask createMinorCompactionTask(long flushId) {
+  private MinorCompactionTask createMinorCompactionTask(long flushId, MinorCompactionReason
mincReason) {
     MinorCompactionTask mct;
     long t1, t2;
     
@@ -2371,7 +2377,7 @@ public class Tablet {
           return null;
         }
         
-        mct = prepareForMinC(flushId);
+        mct = prepareForMinC(flushId, mincReason);
         t2 = System.currentTimeMillis();
       }
     } finally {
@@ -2647,7 +2653,7 @@ public class Tablet {
       tabletMemory.waitForMinC();
       
       try {
-        mct = prepareForMinC(getFlushID());
+        mct = prepareForMinC(getFlushID(), MinorCompactionReason.CLOSE);
       } catch (NoNodeException e) {
         throw new RuntimeException(e);
       }
@@ -2700,7 +2706,7 @@ public class Tablet {
     
     if (saveState && tabletMemory.getMemTable().getNumEntries() > 0) {
       try {
-        prepareForMinC(getFlushID()).run();
+        prepareForMinC(getFlushID(), MinorCompactionReason.CLOSE).run();
       } catch (NoNodeException e) {
         throw new RuntimeException(e);
       }
@@ -2864,7 +2870,7 @@ public class Tablet {
       if (cmp != 0)
         return cmp;
       
-      if (reason == MajorCompactionReason.ALL || reason == MajorCompactionReason.CHOP) {
+      if (reason == MajorCompactionReason.USER || reason == MajorCompactionReason.CHOP) {
         // for these types of compactions want to do the oldest first
         cmp = (int) (queued - o.queued);
         if (cmp != 0)
@@ -2895,7 +2901,7 @@ public class Tablet {
   public boolean needsMajorCompaction(MajorCompactionReason reason) {
     if (majorCompactionInProgress)
       return false;
-    if (reason == MajorCompactionReason.CHOP || reason == MajorCompactionReason.ALL)
+    if (reason == MajorCompactionReason.CHOP || reason == MajorCompactionReason.USER)
       return true;
     return tabletResources.needsMajorCompaction(datafileManager.getDatafileSizes(), reason);
   }
@@ -3246,8 +3252,8 @@ public class Tablet {
               + datafileManager.abs2rel(new Path(compactTmpName)));
 
           // always propagate deletes, unless last batch
-          Compactor compactor = new Compactor(conf, fs, copy, null, compactTmpName, filesToCompact.size()
== 0 ? propogateDeletes : true,
-              acuTableConf, extent, cenv, compactionIterators);
+          Compactor compactor = new Compactor(conf, fs, copy, null, compactTmpName, filesToCompact.size()
== 0 ? propogateDeletes : true, acuTableConf, extent,
+              cenv, compactionIterators, reason);
           
           CompactionStats mcs = compactor.call();
           
@@ -3816,7 +3822,7 @@ public class Tablet {
         updateMetadata = true;
         lastCompactID = compactionId;
       } else
-        initiateMajorCompaction(MajorCompactionReason.ALL);
+        initiateMajorCompaction(MajorCompactionReason.USER);
     }
     
     if (updateMetadata) {

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1435371&r1=1435370&r2=1435371&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
(original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
Fri Jan 18 22:06:44 2013
@@ -111,6 +111,7 @@ import org.apache.accumulo.core.security
 import org.apache.accumulo.core.security.thrift.AuthInfo;
 import org.apache.accumulo.core.security.thrift.SecurityErrorCode;
 import org.apache.accumulo.core.security.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
 import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
 import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
 import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
@@ -160,10 +161,12 @@ import org.apache.accumulo.server.securi
 import org.apache.accumulo.server.security.SecurityConstants;
 import org.apache.accumulo.server.security.SecurityUtil;
 import org.apache.accumulo.server.security.ZKAuthenticator;
+import org.apache.accumulo.server.tabletserver.Compactor.CompactionInfo;
 import org.apache.accumulo.server.tabletserver.Tablet.CommitSession;
 import org.apache.accumulo.server.tabletserver.Tablet.KVEntry;
 import org.apache.accumulo.server.tabletserver.Tablet.LookupResult;
 import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason;
+import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason;
 import org.apache.accumulo.server.tabletserver.Tablet.ScanBatch;
 import org.apache.accumulo.server.tabletserver.Tablet.Scanner;
 import org.apache.accumulo.server.tabletserver.Tablet.SplitInfo;
@@ -2121,6 +2124,24 @@ public class TabletServer extends Abstra
       }
     }
     
+    @Override
+    public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, AuthInfo credentials)
throws ThriftSecurityException, TException {
+      try {
+        if (!authenticator.hasSystemPermission(credentials, credentials.user, SystemPermission.SYSTEM))
+          throw new ThriftSecurityException(credentials.user, SecurityErrorCode.PERMISSION_DENIED);
+      } catch (AccumuloSecurityException e) {
+        throw e.asThriftException();
+      }
+      
+      List<CompactionInfo> compactions = Compactor.getRunningCompactions();
+      List<ActiveCompaction> ret = new ArrayList<ActiveCompaction>(compactions.size());
+      
+      for (CompactionInfo compactionInfo : compactions) {
+        ret.add(compactionInfo.toThrift());
+      }
+      
+      return ret;
+    }
   }
   
   private class SplitRunner implements Runnable {
@@ -2192,7 +2213,7 @@ public class TabletServer extends Abstra
             
             if (tablet.getLogCount() >= maxLogEntriesPerTablet) {
               log.debug("Initiating minor compaction for " + tablet.getExtent() + " because
it has " + tablet.getLogCount() + " write ahead logs");
-              tablet.initiateMinorCompaction();
+              tablet.initiateMinorCompaction(MinorCompactionReason.SYSTEM);
             }
             
             synchronized (tablet) {
@@ -2518,7 +2539,7 @@ public class TabletServer extends Abstra
            * it to the logs (the file will be in !METADATA, preventing replay of compacted
data)... but do not want a majc to wipe the file out from !METADATA
            * and then have another process failure... this could cause duplicate data to
replay
            */
-          if (tablet.getNumEntriesInMemory() > 0 && !tablet.minorCompactNow())
{
+          if (tablet.getNumEntriesInMemory() > 0 && !tablet.minorCompactNow(MinorCompactionReason.SYSTEM))
{
             throw new RuntimeException("Minor compaction after recovery fails for " + extentToOpen);
           }
           

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java?rev=1435371&r1=1435370&r2=1435371&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
(original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
Fri Jan 18 22:06:44 2013
@@ -51,6 +51,7 @@ import org.apache.accumulo.core.util.Uti
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.tabletserver.FileManager.ScanFileManager;
 import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason;
+import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason;
 import org.apache.accumulo.server.util.time.SimpleTimer;
 import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
 import org.apache.hadoop.fs.FileSystem;
@@ -349,7 +350,7 @@ public class TabletServerResourceManager
                 continue;
               }
               
-              if (!tabletReport.getTablet().initiateMinorCompaction()) {
+              if (!tabletReport.getTablet().initiateMinorCompaction(MinorCompactionReason.SYSTEM))
{
                 if (tabletReport.getTablet().isClosed()) {
                   tabletReports.remove(tabletReport.getExtent());
                   log.debug("Ignoring memory manager recommendation: not minor compacting
closed tablet " + keyExtent);
@@ -545,7 +546,7 @@ public class TabletServerResourceManager
     // when too many files are open, we may want tablets to compact down
     // to one map file
     Map<String,Long> findMapFilesToCompact(SortedMap<String,DataFileValue> tabletFiles,
MajorCompactionReason reason) {
-      if (reason == MajorCompactionReason.ALL) {
+      if (reason == MajorCompactionReason.USER) {
         Map<String,Long> files = new HashMap<String,Long>();
         for (Entry<String,DataFileValue> entry : tabletFiles.entrySet()) {
           files.put(entry.getKey(), entry.getValue().getSize());
@@ -634,7 +635,7 @@ public class TabletServerResourceManager
         
       // int threshold;
       
-      if (reason == MajorCompactionReason.ALL)
+      if (reason == MajorCompactionReason.USER)
         return true;
       
       if (reason == MajorCompactionReason.IDLE) {

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java?rev=1435371&r1=1435370&r2=1435371&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java
(original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/performance/thrift/NullTserver.java
Fri Jan 18 22:06:44 2013
@@ -49,6 +49,7 @@ import org.apache.accumulo.core.master.t
 import org.apache.accumulo.core.security.thrift.AuthInfo;
 import org.apache.accumulo.core.security.thrift.SecurityErrorCode;
 import org.apache.accumulo.core.security.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
 import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
@@ -202,6 +203,11 @@ public class NullTserver {
     @Override
     public void removeLogs(TInfo tinfo, AuthInfo credentials, List<String> filenames)
throws TException {
     }
+    
+    @Override
+    public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, AuthInfo credentials)
throws ThriftSecurityException, TException {
+      return new ArrayList<ActiveCompaction>();
+    }
   }
   
   static class Opts extends Help {



Mime
View raw message