accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vi...@apache.org
Subject svn commit: r1438328 - in /accumulo/branches/ACCUMULO-259: ./ assemble/ core/ core/src/main/java/org/apache/accumulo/core/util/shell/ core/src/main/java/org/apache/accumulo/core/util/shell/commands/ fate/src/main/java/org/apache/accumulo/fate/ fate/src...
Date Fri, 25 Jan 2013 05:28:55 GMT
Author: vines
Date: Fri Jan 25 05:28:53 2013
New Revision: 1438328

URL: http://svn.apache.org/viewvc?rev=1438328&view=rev
Log:
ACCUMULO-259 Merging up to 1438309


Added:
    accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteShellterCommand.java
      - copied unchanged from r1438327, accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteShellterCommand.java
    accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ListShellIterCommand.java
      - copied unchanged from r1438327, accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ListShellIterCommand.java
    accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetShellIterCommand.java
      - copied unchanged from r1438327, accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetShellIterCommand.java
Modified:
    accumulo/branches/ACCUMULO-259/   (props changed)
    accumulo/branches/ACCUMULO-259/assemble/   (props changed)
    accumulo/branches/ACCUMULO-259/core/   (props changed)
    accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java
    accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/CompactCommand.java
    accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteManyCommand.java
    accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ListIterCommand.java
    accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ScanCommand.java
    accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetIterCommand.java
    accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetScanIterCommand.java
    accumulo/branches/ACCUMULO-259/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java
  (props changed)
    accumulo/branches/ACCUMULO-259/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
  (props changed)
    accumulo/branches/ACCUMULO-259/packages/   (props changed)
    accumulo/branches/ACCUMULO-259/server/   (props changed)
    accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java
    accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
    accumulo/branches/ACCUMULO-259/src/   (props changed)

Propchange: accumulo/branches/ACCUMULO-259/
------------------------------------------------------------------------------
  Merged /accumulo/trunk:r1438310-1438327

Propchange: accumulo/branches/ACCUMULO-259/assemble/
------------------------------------------------------------------------------
  Merged /accumulo/trunk/assemble:r1438310-1438327

Propchange: accumulo/branches/ACCUMULO-259/core/
------------------------------------------------------------------------------
  Merged /accumulo/trunk/core:r1438310-1438327

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java?rev=1438328&r1=1438327&r2=1438328&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java
(original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java
Fri Jan 25 05:28:53 2013
@@ -82,6 +82,7 @@ import org.apache.accumulo.core.util.she
 import org.apache.accumulo.core.util.shell.commands.DeleteManyCommand;
 import org.apache.accumulo.core.util.shell.commands.DeleteRowsCommand;
 import org.apache.accumulo.core.util.shell.commands.DeleteScanIterCommand;
+import org.apache.accumulo.core.util.shell.commands.DeleteShellterCommand;
 import org.apache.accumulo.core.util.shell.commands.DeleteTableCommand;
 import org.apache.accumulo.core.util.shell.commands.DeleteUserCommand;
 import org.apache.accumulo.core.util.shell.commands.DropTableCommand;
@@ -108,6 +109,7 @@ import org.apache.accumulo.core.util.she
 import org.apache.accumulo.core.util.shell.commands.ListCompactionsCommand;
 import org.apache.accumulo.core.util.shell.commands.ListIterCommand;
 import org.apache.accumulo.core.util.shell.commands.ListScansCommand;
+import org.apache.accumulo.core.util.shell.commands.ListShellIterCommand;
 import org.apache.accumulo.core.util.shell.commands.MaxRowCommand;
 import org.apache.accumulo.core.util.shell.commands.MergeCommand;
 import org.apache.accumulo.core.util.shell.commands.NoTableCommand;
@@ -125,6 +127,7 @@ import org.apache.accumulo.core.util.she
 import org.apache.accumulo.core.util.shell.commands.SetGroupsCommand;
 import org.apache.accumulo.core.util.shell.commands.SetIterCommand;
 import org.apache.accumulo.core.util.shell.commands.SetScanIterCommand;
+import org.apache.accumulo.core.util.shell.commands.SetShellIterCommand;
 import org.apache.accumulo.core.util.shell.commands.SleepCommand;
 import org.apache.accumulo.core.util.shell.commands.SystemPermissionsCommand;
 import org.apache.accumulo.core.util.shell.commands.TableCommand;
@@ -169,6 +172,7 @@ public class Shell extends ShellOptions 
   private Class<? extends Formatter> defaultFormatterClass = DefaultFormatter.class;
   private Class<? extends Formatter> binaryFormatterClass = BinaryFormatter.class;
   public Map<String,List<IteratorSetting>> scanIteratorOptions = new HashMap<String,List<IteratorSetting>>();
+  public Map<String,List<IteratorSetting>> iteratorProfiles = new HashMap<String,List<IteratorSetting>>();
   
   private Token rootToken;
   public final Map<String,Command> commandFactory = new TreeMap<String,Command>();
@@ -299,7 +303,8 @@ public class Shell extends ShellOptions 
     Command[] execCommands = {new ExecfileCommand(), new HistoryCommand()};
     Command[] exitCommands = {new ByeCommand(), new ExitCommand(), new QuitCommand()};
     Command[] helpCommands = {new AboutCommand(), new HelpCommand(), new InfoCommand(), new
QuestionCommand()};
-    Command[] iteratorCommands = {new DeleteIterCommand(), new DeleteScanIterCommand(), new
ListIterCommand(), new SetIterCommand(), new SetScanIterCommand()};
+    Command[] iteratorCommands = {new DeleteIterCommand(), new DeleteScanIterCommand(), new
ListIterCommand(), new SetIterCommand(), new SetScanIterCommand(),
+        new SetShellIterCommand(), new ListShellIterCommand(), new DeleteShellterCommand()};
     Command[] otherCommands = {new HiddenCommand()};
     Command[] permissionsCommands = {new GrantCommand(), new RevokeCommand(), new SystemPermissionsCommand(),
new TablePermissionsCommand(),
         new UserPermissionsCommand()};

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/CompactCommand.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/CompactCommand.java?rev=1438328&r1=1438327&r2=1438328&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/CompactCommand.java
(original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/CompactCommand.java
Fri Jan 25 05:28:53 2013
@@ -16,8 +16,13 @@
  */
 package org.apache.accumulo.core.util.shell.commands;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.util.shell.Shell;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
@@ -25,10 +30,11 @@ import org.apache.commons.cli.Options;
 import org.apache.hadoop.io.Text;
 
 public class CompactCommand extends TableOperation {
-  private Option noFlushOption, waitOpt;
+  private Option noFlushOption, waitOpt, profileOpt;
   private boolean flush;
   private Text startRow;
   private Text endRow;
+  private List<IteratorSetting> iterators;
   
   boolean override = false;
   private boolean wait;
@@ -44,7 +50,8 @@ public class CompactCommand extends Tabl
       if (wait) {
         Shell.log.info("Compacting table ...");
       }
-      shellState.getConnector().tableOperations().compact(tableName, startRow, endRow, flush,
wait);
+      
+      shellState.getConnector().tableOperations().compact(tableName, startRow, endRow, iterators,
flush, wait);
       
       Shell.log.info("Compaction of table " + tableName + " " + (wait ? "completed" : "started")
+ " for given range");
     } catch (Exception ex) {
@@ -59,6 +66,19 @@ public class CompactCommand extends Tabl
     endRow = OptUtil.getEndRow(cl);
     wait = cl.hasOption(waitOpt.getOpt());
     
+    if (cl.hasOption(profileOpt.getOpt())) {
+      List<IteratorSetting> iterators = shellState.iteratorProfiles.get(cl.getOptionValue(profileOpt.getOpt()));
+      if (iterators == null) {
+        Shell.log.error("Profile " + cl.getOptionValue(profileOpt.getOpt()) + " does not
exist");
+        return -1;
+      }
+      
+      this.iterators = new ArrayList<IteratorSetting>(iterators);
+    } else {
+      this.iterators = Collections.emptyList();
+    }
+
+
     return super.execute(fullCommand, cl, shellState);
   }
   
@@ -73,6 +93,10 @@ public class CompactCommand extends Tabl
     waitOpt = new Option("w", "wait", false, "wait for compact to finish");
     opts.addOption(waitOpt);
     
+    profileOpt = new Option("pn", "profile", true, "iterator profile name");
+    profileOpt.setArgName("profile");
+    opts.addOption(profileOpt);
+
     return opts;
   }
 }

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteManyCommand.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteManyCommand.java?rev=1438328&r1=1438327&r2=1438328&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteManyCommand.java
(original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteManyCommand.java
Fri Jan 25 05:28:53 2013
@@ -47,7 +47,7 @@ public class DeleteManyCommand extends S
     scanner.addScanIterator(new IteratorSetting(Integer.MAX_VALUE, "NOVALUE", SortedKeyIterator.class));
     
     // handle session-specific scan iterators
-    addScanIterators(shellState, scanner, tableName);
+    addScanIterators(shellState, cl, scanner, tableName);
     
     // handle remaining optional arguments
     scanner.setRange(getRange(cl, interpeter));

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ListIterCommand.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ListIterCommand.java?rev=1438328&r1=1438327&r2=1438328&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ListIterCommand.java
(original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ListIterCommand.java
Fri Jan 25 05:28:53 2013
@@ -79,7 +79,7 @@ public class ListIterCommand extends Com
   }
   
   public String description() {
-    return "lists table-specific iterators";
+    return "lists table-specific iterators configured in this shell session";
   }
   
   @Override

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ScanCommand.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ScanCommand.java?rev=1438328&r1=1438327&r2=1438328&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ScanCommand.java
(original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/ScanCommand.java
Fri Jan 25 05:28:53 2013
@@ -55,6 +55,7 @@ public class ScanCommand extends Command
   private Option optStartRowExclusive;
   private Option optEndRowExclusive;
   private Option timeoutOption;
+  private Option profileOpt;
   
   public int execute(final String fullCommand, final CommandLine cl, final Shell shellState)
throws Exception {
 
@@ -72,7 +73,7 @@ public class ScanCommand extends Command
     final Scanner scanner = shellState.getConnector().createScanner(tableName, auths);
     
     // handle session-specific scan iterators
-    addScanIterators(shellState, scanner, tableName);
+    addScanIterators(shellState, cl, scanner, tableName);
     
     // handle remaining optional arguments
     scanner.setRange(getRange(cl, interpeter));
@@ -117,12 +118,24 @@ public class ScanCommand extends Command
     return Long.MAX_VALUE;
   }
 
-  protected void addScanIterators(final Shell shellState, final Scanner scanner, final String
tableName) {
-    final List<IteratorSetting> tableScanIterators = shellState.scanIteratorOptions.get(shellState.getTableName());
-    if (tableScanIterators == null) {
-      Shell.log.debug("Found no scan iterators to set");
-      return;
+  protected void addScanIterators(final Shell shellState, CommandLine cl, final Scanner scanner,
final String tableName) {
+    
+    List<IteratorSetting> tableScanIterators;
+    if (cl.hasOption(profileOpt.getOpt())) {
+      String profile = cl.getOptionValue(profileOpt.getOpt());
+      tableScanIterators = shellState.iteratorProfiles.get(profile);
+      
+      if (tableScanIterators == null) {
+        throw new IllegalArgumentException("Profile " + profile + " does not exist");
+      }
+    } else {
+      tableScanIterators = shellState.scanIteratorOptions.get(shellState.getTableName());
+      if (tableScanIterators == null) {
+        Shell.log.debug("Found no scan iterators to set");
+        return;
+      }
     }
+
     Shell.log.debug("Found " + tableScanIterators.size() + " scan iterators to set");
     
     for (IteratorSetting setting : tableScanIterators) {
@@ -287,6 +300,9 @@ public class ScanCommand extends Command
     timeoutOption.setArgName("timeout");
     outputFileOpt.setArgName("file");
     
+    profileOpt = new Option("pn", "profile", true, "iterator profile name");
+    profileOpt.setArgName("profile");
+
     o.addOption(scanOptAuths);
     o.addOption(scanOptRow);
     o.addOption(OptUtil.startRowOpt());
@@ -303,6 +319,7 @@ public class ScanCommand extends Command
     o.addOption(formatterInterpeterOpt);
     o.addOption(timeoutOption);
     o.addOption(outputFileOpt);
+    o.addOption(profileOpt);
     
     return o;
   }

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetIterCommand.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetIterCommand.java?rev=1438328&r1=1438327&r2=1438328&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetIterCommand.java
(original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetIterCommand.java
Fri Jan 25 05:28:53 2013
@@ -57,7 +57,6 @@ public class SetIterCommand extends Comm
   
   public int execute(final String fullCommand, final CommandLine cl, final Shell shellState)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException,
       IOException, ShellCommandException {
-    final String tableName = OptUtil.getTableOpt(cl, shellState);
     
     final int priority = Integer.parseInt(cl.getOptionValue(priorityOpt.getOpt()));
     
@@ -87,14 +86,18 @@ public class SetIterCommand extends Comm
       throw new ShellCommandException(ErrorCode.INITIALIZATION_FAILURE, "Servers are unable
to load " + aggregatorClass + " as type "
           + Aggregator.class.getName());
     }    
-    setTableProperties(cl, shellState, tableName, priority, options, classname, name);
+    setTableProperties(cl, shellState, priority, options, classname, name);
     
     return 0;
   }
   
-  protected void setTableProperties(final CommandLine cl, final Shell shellState, final String
tableName, final int priority, final Map<String,String> options, final String classname,
final String name)
+  protected void setTableProperties(final CommandLine cl, final Shell shellState, final int
priority, final Map<String,String> options, final String classname,
+      final String name)
       throws AccumuloException, AccumuloSecurityException, ShellCommandException, TableNotFoundException
{
     // remove empty values
+    
+    final String tableName = OptUtil.getTableOpt(cl, shellState);
+
     for (Iterator<Entry<String,String>> i = options.entrySet().iterator(); i.hasNext();)
{
       final Entry<String,String> entry = i.next();
       if (entry.getValue() == null || entry.getValue().isEmpty()) {

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetScanIterCommand.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetScanIterCommand.java?rev=1438328&r1=1438327&r2=1438328&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetScanIterCommand.java
(original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetScanIterCommand.java
Fri Jan 25 05:28:53 2013
@@ -22,6 +22,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -43,17 +44,29 @@ public class SetScanIterCommand extends 
   @Override
   public int execute(final String fullCommand, final CommandLine cl, final Shell shellState)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException,
       IOException, ShellCommandException {
+    Shell.log.warn("Deprecated, use " + new SetShellIterCommand().getName());
     return super.execute(fullCommand, cl, shellState);
   }
   
   @Override
-  protected void setTableProperties(final CommandLine cl, final Shell shellState, final String
tableName, final int priority, final Map<String,String> options, final String classname,
final String name)
-      throws AccumuloException, AccumuloSecurityException, ShellCommandException, TableNotFoundException
{
+  protected void setTableProperties(final CommandLine cl, final Shell shellState, final int
priority, final Map<String,String> options, final String classname,
+      final String name) throws AccumuloException, AccumuloSecurityException, ShellCommandException,
TableNotFoundException {
+    
+    final String tableName = OptUtil.getTableOpt(cl, shellState);
+
     // instead of setting table properties, just put the options in a list to use at scan
time
     if (!shellState.getConnector().instanceOperations().testClassLoad(classname, SortedKeyValueIterator.class.getName()))
{
       throw new ShellCommandException(ErrorCode.INITIALIZATION_FAILURE, "Servers are unable
to load " + classname + " as type "
           + SortedKeyValueIterator.class.getName());
     }
+    
+    for (Iterator<Entry<String,String>> i = options.entrySet().iterator(); i.hasNext();)
{
+      final Entry<String,String> entry = i.next();
+      if (entry.getValue() == null || entry.getValue().isEmpty()) {
+        i.remove();
+      }
+    }
+
     List<IteratorSetting> tableScanIterators = shellState.scanIteratorOptions.get(tableName);
     if (tableScanIterators == null) {
       tableScanIterators = new ArrayList<IteratorSetting>();

Propchange: accumulo/branches/ACCUMULO-259/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java
------------------------------------------------------------------------------
  Merged /accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:r1438310-1438327

Propchange: accumulo/branches/ACCUMULO-259/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
------------------------------------------------------------------------------
  Merged /accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:r1438310-1438327

Propchange: accumulo/branches/ACCUMULO-259/packages/
------------------------------------------------------------------------------
  Merged /accumulo/trunk/packages:r1438310-1438327

Propchange: accumulo/branches/ACCUMULO-259/server/
------------------------------------------------------------------------------
  Merged /accumulo/trunk/server:r1438310-1438327

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java?rev=1438328&r1=1438327&r2=1438328&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java
(original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/master/tableOps/CompactRange.java
Fri Jan 25 05:28:53 2013
@@ -217,14 +217,13 @@ public class CompactRange extends Master
           long flushID = Long.parseLong(new String(tokens[0]));
           flushID++;
           
+          if (tokens.length > 1) {
+            throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT,
TableOperationExceptionType.OTHER,
+                "Another compaction with iterators is running");
+          }
+
           String txidString = String.format("%016x", tid);
           StringBuilder encodedIterators = new StringBuilder();
-          for (int i = 1; i < tokens.length; i++) {
-            if (tokens[i].startsWith(txidString))
-              continue; // skip self
-            encodedIterators.append(",");
-            encodedIterators.append(tokens[i]);
-          }
 
           if (iterators != null && iterators.getIterators().size() > 0) {
             Hex hex = new Hex();

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java?rev=1438328&r1=1438327&r2=1438328&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
(original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
Fri Jan 25 05:28:53 2013
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.cloudtrace.instrument.Span;
 import org.apache.accumulo.cloudtrace.instrument.Trace;
@@ -38,11 +39,12 @@ import org.apache.accumulo.core.data.thr
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVIterator;
 import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
 import org.apache.accumulo.core.iterators.IteratorUtil;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.WrappingIterator;
 import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator;
-import org.apache.accumulo.core.iterators.system.CountingIterator;
 import org.apache.accumulo.core.iterators.system.DeletingIterator;
 import org.apache.accumulo.core.iterators.system.MultiIterator;
 import org.apache.accumulo.core.iterators.system.TimeSettingIterator;
@@ -67,6 +69,43 @@ import org.apache.log4j.Logger;
 
 public class Compactor implements Callable<CompactionStats> {
   
+  public class CountingIterator extends WrappingIterator {
+    
+    private long count;
+    
+    public CountingIterator deepCopy(IteratorEnvironment env) {
+      return new CountingIterator(this, env);
+    }
+    
+    private CountingIterator(CountingIterator other, IteratorEnvironment env) {
+      setSource(other.getSource().deepCopy(env));
+      count = 0;
+    }
+    
+    public CountingIterator(SortedKeyValueIterator<Key,Value> source) {
+      this.setSource(source);
+      count = 0;
+    }
+    
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String>
options, IteratorEnvironment env) {
+      throw new UnsupportedOperationException();
+    }
+    
+    @Override
+    public void next() throws IOException {
+      super.next();
+      count++;
+      if (count % 1024 == 0) {
+        entriesRead.addAndGet(1024);
+      }
+    }
+    
+    public long getCount() {
+      return count;
+    }
+  }
+
   private static final Logger log = Logger.getLogger(Compactor.class);
   
   static class CompactionCanceledException extends Exception {
@@ -93,37 +132,22 @@ public class Compactor implements Callab
   // things to report
   private String currentLocalityGroup = "";
   private long startTime;
-  private long currentEntriesRead = 0;
-  private long currentEntriesWritten = 0;
-  private long totalEntriesRead = 0;
-  private long totalEntriesWritten = 0;
+
   private MajorCompactionReason reason;
   protected MinorCompactionReason mincReason;
   
-  private synchronized void updateStats(long read, long written) {
-    this.currentEntriesRead = read;
-    this.currentEntriesWritten = written;
-  }
-
-  private synchronized void clearStats() {
-    totalEntriesRead = 0;
-    totalEntriesWritten = 0;
-    currentEntriesRead = 0;
-    currentEntriesWritten = 0;
-    currentLocalityGroup = "";
-  }
-
-  private synchronized void rollStats() {
-    this.totalEntriesRead = currentEntriesRead;
-    this.totalEntriesWritten = currentEntriesWritten;
-    currentEntriesRead = 0;
-    currentEntriesWritten = 0;
-  }
+  private AtomicLong entriesRead = new AtomicLong(0);
+  private AtomicLong entriesWritten = new AtomicLong(0);
   
   private synchronized void setLocalityGroup(String name) {
     this.currentLocalityGroup = name;
   }
 
+  private void clearStats() {
+    entriesRead.set(0);
+    entriesWritten.set(0);
+  }
+
   protected static Set<Compactor> runningCompactions = Collections.synchronizedSet(new
HashSet<Compactor>());
   
   public static class CompactionInfo {
@@ -134,13 +158,9 @@ public class Compactor implements Callab
     private long entriesWritten;
     
     CompactionInfo(Compactor compactor) {
-      // get a consistent snapshot of changing stats
-      synchronized (compactor) {
-        this.localityGroup = compactor.currentLocalityGroup;
-        this.entriesRead = compactor.totalEntriesRead + compactor.currentEntriesRead;
-        this.entriesWritten = compactor.totalEntriesWritten + compactor.currentEntriesWritten;
-      }
-      
+      this.localityGroup = compactor.currentLocalityGroup;
+      this.entriesRead = compactor.entriesRead.get();
+      this.entriesWritten = compactor.entriesWritten.get();
       this.compactor = compactor;
     }
 
@@ -334,7 +354,7 @@ public class Compactor implements Callab
       }
     }
   }
-  
+
   private List<SortedKeyValueIterator<Key,Value>> openMapDataFiles(String lgName,
ArrayList<FileSKVIterator> readers) throws IOException {
     
     List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(filesToCompact.size());
@@ -429,12 +449,10 @@ public class Compactor implements Callab
           entriesCompacted++;
           
           if (entriesCompacted % 1024 == 0) {
-            // Periodically update stats, do not want to do this too often since its syncronized
-            updateStats(citr.getCount(), entriesCompacted);
+            // Periodically update stats, do not want to do this too often since its volatile
+            entriesWritten.addAndGet(1024);
           }
         }
-        
-        rollStats();
 
         if (itr.hasTop() && !env.isCompactionEnabled()) {
           // cancel major compaction operation

Propchange: accumulo/branches/ACCUMULO-259/src/
------------------------------------------------------------------------------
  Merged /accumulo/trunk/src:r1438310-1438327



Mime
View raw message