accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vi...@apache.org
Subject svn commit: r1353663 [4/6] - in /accumulo/branches/ACCUMULO-259: ./ bin/ conf/ conf/examples/1GB/native-standalone/ conf/examples/1GB/standalone/ conf/examples/512MB/native-standalone/ conf/examples/512MB/standalone/ core/ core/src/main/java/org/apache...
Date Mon, 25 Jun 2012 17:09:39 GMT
Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/AddressUtil.java Mon Jun 25 17:09:31 2012
@@ -23,7 +23,9 @@ import org.apache.thrift.transport.TSock
 
 public class AddressUtil {
   static public InetSocketAddress parseAddress(String address, int defaultPort) throws NumberFormatException {
-    final String[] parts = address.split(":", 2);
+    String[] parts = address.split(":", 2);
+    if (address.contains("+"))
+      parts = address.split("\\+", 2);
     if (parts.length == 2) {
       if (parts[1].isEmpty())
         return new InetSocketAddress(parts[0], defaultPort);

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java Mon Jun 25 17:09:31 2012
@@ -170,7 +170,7 @@ public class Shell extends ShellOptions 
   protected boolean configError = false;
   
   // exit if true
-  protected boolean exit = false;
+  private boolean exit = false;
   
   // file to execute commands from
   protected String execFile = null;
@@ -182,10 +182,12 @@ public class Shell extends ShellOptions 
   private boolean disableAuthTimeout;
   private long authTimeout;
   private long lastUserActivity = System.currentTimeMillis();
+  private boolean logErrorsToConsole = false;
+  private PrintWriter writer = null;
+  private boolean masking = false;
   
   public Shell() throws IOException {
-    super();
-    this.reader = new ConsoleReader();
+    this(new ConsoleReader());
   }
   
   public Shell(ConsoleReader reader) {
@@ -193,8 +195,13 @@ public class Shell extends ShellOptions 
     this.reader = reader;
   }
   
+  public Shell(ConsoleReader reader, PrintWriter writer) {
+    this(reader);
+    this.writer = writer;
+  }
+  
   // Not for client use
-  public void config(String... args) {
+  public boolean config(String... args) {
     
     CommandLine cl;
     try {
@@ -205,7 +212,7 @@ public class Shell extends ShellOptions 
       if (cl.hasOption(helpOpt.getOpt())) {
         configError = true;
         printHelp("shell", SHELL_DESCRIPTION, opts);
-        return;
+        return true;
       }
       
       setDebugging(cl.hasOption(debugOption.getLongOpt()));
@@ -219,7 +226,7 @@ public class Shell extends ShellOptions 
       configError = true;
       printException(e);
       printHelp("shell", SHELL_DESCRIPTION, opts);
-      return;
+      return true;
     }
     
     // get the options that were parsed
@@ -249,11 +256,11 @@ public class Shell extends ShellOptions 
       });
       
       if (passw == null)
-        passw = reader.readLine("Enter current password for '" + user + "'@'" + instance.getInstanceName() + "': ", '*');
+        passw = readMaskedLine("Enter current password for '" + user + "'@'" + instance.getInstanceName() + "': ", '*');
       if (passw == null) {
         reader.printNewline();
         configError = true;
-        return;
+        return true;
       } // user canceled
       
       pass = passw.getBytes();
@@ -317,6 +324,7 @@ public class Shell extends ShellOptions 
     for (Command cmd : otherCommands) {
       commandFactory.put(cmd.getName(), cmd);
     }
+    return configError;
   }
   
   @SuppressWarnings("deprecation")
@@ -390,7 +398,7 @@ public class Shell extends ShellOptions 
     }
     
     while (true) {
-      if (exit)
+      if (hasExited())
         return exitCode;
       
       // If tab completion is true we need to reset
@@ -417,6 +425,7 @@ public class Shell extends ShellOptions 
     reader.printString("\n" + SHELL_DESCRIPTION + "\n" + "- \n" + "- version: " + Constants.VERSION + "\n" + "- instance name: "
         + connector.getInstance().getInstanceName() + "\n" + "- instance id: " + connector.getInstance().getInstanceID() + "\n" + "- \n"
         + "- type 'help' for a list of available commands\n" + "- \n");
+    reader.flushConsole();
   }
   
   public void printVerboseInfo() throws IOException {
@@ -446,7 +455,7 @@ public class Shell extends ShellOptions 
     reader.printString(sb.toString());
   }
   
-  protected String getDefaultPrompt() {
+  public String getDefaultPrompt() {
     return connector.whoami() + "@" + connector.getInstance().getInstanceName() + (getTableName().isEmpty() ? "" : " ") + getTableName() + "> ";
   }
   
@@ -479,6 +488,7 @@ public class Shell extends ShellOptions 
         sc = commandFactory.get(command);
         if (sc == null) {
           reader.printString(String.format("Unknown command \"%s\".  Enter \"help\" for a list possible commands.\n", command));
+          reader.flushConsole();
           return;
         }
         
@@ -486,7 +496,7 @@ public class Shell extends ShellOptions 
           reader.printString("Shell has been idle for too long. Please re-authenticate.\n");
           boolean authFailed = true;
           do {
-            String pwd = reader.readLine("Enter current password for '" + connector.whoami() + "': ", '*');
+            String pwd = readMaskedLine("Enter current password for '" + connector.whoami() + "': ", '*');
             if (pwd == null) {
               reader.printNewline();
               return;
@@ -515,7 +525,7 @@ public class Shell extends ShellOptions 
         int expectedArgLen = sc.numArgs();
         if (cl.hasOption(helpOption)) {
           // Display help if asked to; otherwise execute the command
-          sc.printHelp();
+          sc.printHelp(this);
         } else if (expectedArgLen != NO_FIXED_ARG_LENGTH_CHECK && actualArgLen != expectedArgLen) {
           ++exitCode;
           // Check for valid number of fixed arguments (if not
@@ -523,7 +533,7 @@ public class Shell extends ShellOptions 
           // vararg-like commands)
           printException(new IllegalArgumentException(String.format("Expected %d argument%s. There %s %d.", expectedArgLen, expectedArgLen == 1 ? "" : "s",
               actualArgLen == 1 ? "was" : "were", actualArgLen)));
-          sc.printHelp();
+          sc.printHelp(this);
         } else {
           int tmpCode = sc.execute(input, cl, this);
           exitCode += tmpCode;
@@ -546,7 +556,7 @@ public class Shell extends ShellOptions 
           printException(e);
         }
         if (sc != null)
-          sc.printHelp();
+          sc.printHelp(this);
       } catch (Exception e) {
         ++exitCode;
         printException(e);
@@ -555,6 +565,7 @@ public class Shell extends ShellOptions 
       ++exitCode;
       printException(new BadArgumentException("Unrecognized empty command", command, -1));
     }
+    reader.flushConsole();
   }
   
   /**
@@ -683,12 +694,12 @@ public class Shell extends ShellOptions 
     
     // The general version of this method uses the HelpFormatter
     // that comes with the apache Options package to print out the help
-    public final void printHelp() {
-      Shell.printHelp(usage(), "description: " + this.description(), getOptionsWithHelp());
+    public final void printHelp(Shell shellState) {
+      shellState.printHelp(usage(), "description: " + this.description(), getOptionsWithHelp());
     }
     
-    public final void printHelp(int width) {
-      Shell.printHelp(usage(), "description: " + this.description(), getOptionsWithHelp(), width);
+    public final void printHelp(Shell shellState, int width) {
+      shellState.printHelp(usage(), "description: " + this.description(), getOptionsWithHelp(), width);
     }
     
     // Get options with help
@@ -829,20 +840,20 @@ public class Shell extends ShellOptions 
     printException(cve, "");
     int COL1 = 50, COL2 = 14;
     int col3 = Math.max(1, Math.min(Integer.MAX_VALUE, reader.getTermwidth() - COL1 - COL2 - 6));
-    log.error(String.format("%" + COL1 + "s-+-%" + COL2 + "s-+-%" + col3 + "s\n", repeat("-", COL1), repeat("-", COL2), repeat("-", col3)));
-    log.error(String.format("%-" + COL1 + "s | %" + COL2 + "s | %-" + col3 + "s\n", "Constraint class", "Violation code", "Violation Description"));
-    log.error(String.format("%" + COL1 + "s-+-%" + COL2 + "s-+-%" + col3 + "s\n", repeat("-", COL1), repeat("-", COL2), repeat("-", col3)));
+    logError(String.format("%" + COL1 + "s-+-%" + COL2 + "s-+-%" + col3 + "s\n", repeat("-", COL1), repeat("-", COL2), repeat("-", col3)));
+    logError(String.format("%-" + COL1 + "s | %" + COL2 + "s | %-" + col3 + "s\n", "Constraint class", "Violation code", "Violation Description"));
+    logError(String.format("%" + COL1 + "s-+-%" + COL2 + "s-+-%" + col3 + "s\n", repeat("-", COL1), repeat("-", COL2), repeat("-", col3)));
     for (TConstraintViolationSummary cvs : cve.violationSummaries)
-      log.error(String.format("%-" + COL1 + "s | %" + COL2 + "d | %-" + col3 + "s\n", cvs.constrainClass, cvs.violationCode, cvs.violationDescription));
-    log.error(String.format("%" + COL1 + "s-+-%" + COL2 + "s-+-%" + col3 + "s\n", repeat("-", COL1), repeat("-", COL2), repeat("-", col3)));
+      logError(String.format("%-" + COL1 + "s | %" + COL2 + "d | %-" + col3 + "s\n", cvs.constrainClass, cvs.violationCode, cvs.violationDescription));
+    logError(String.format("%" + COL1 + "s-+-%" + COL2 + "s-+-%" + col3 + "s\n", repeat("-", COL1), repeat("-", COL2), repeat("-", col3)));
   }
   
-  public static final void printException(Exception e) {
+  public final void printException(Exception e) {
     printException(e, e.getMessage());
   }
   
-  private static final void printException(Exception e, String msg) {
-    log.error(e.getClass().getName() + (msg != null ? ": " + msg : ""));
+  private final void printException(Exception e, String msg) {
+    logError(e.getClass().getName() + (msg != null ? ": " + msg : ""));
     log.debug(e.getClass().getName() + (msg != null ? ": " + msg : ""), e);
   }
   
@@ -854,14 +865,18 @@ public class Shell extends ShellOptions 
     return Logger.getLogger(Constants.CORE_PACKAGE_NAME).isTraceEnabled();
   }
   
-  private static final void printHelp(String usage, String description, Options opts) {
+  private final void printHelp(String usage, String description, Options opts) {
     printHelp(usage, description, opts, Integer.MAX_VALUE);
   }
   
-  private static final void printHelp(String usage, String description, Options opts, int width) {
+  private final void printHelp(String usage, String description, Options opts, int width) {
     PrintWriter pw = new PrintWriter(System.err);
     new HelpFormatter().printHelp(pw, width, usage, description, opts, 2, 5, null, true);
     pw.flush();
+    if (logErrorsToConsole && writer != null) {
+      new HelpFormatter().printHelp(writer, width, usage, description, opts, 2, 5, null, true);
+      writer.flush();
+    }
   }
   
   public int getExitCode() {
@@ -921,10 +936,39 @@ public class Shell extends ShellOptions 
     Class<? extends Formatter> formatter = FormatterCommand.getCurrentFormatter(tableName, this);
     
     if (null == formatter) {
-      log.error("Could not load the specified formatter. Using the DefaultFormatter");
+      logError("Could not load the specified formatter. Using the DefaultFormatter");
       return this.defaultFormatterClass;
     } else {
       return formatter;
     }
   }
+  
+  public void setLogErrorsToConsole() {
+    this.logErrorsToConsole = true;
+  }
+  
+  private void logError(String s) {
+    log.error(s);
+    if (logErrorsToConsole) {
+      try {
+        reader.printString("ERROR: " + s + "\n");
+        reader.flushConsole();
+      } catch (IOException e) {}
+    }
+  }
+  
+  public String readMaskedLine(String prompt, Character mask) throws IOException {
+    this.masking = true;
+    String s = reader.readLine(prompt, mask);
+    this.masking = false;
+    return s;
+  }
+  
+  public boolean isMasking() {
+    return masking;
+  }
+  
+  public boolean hasExited() {
+    return exit;
+  }
 }

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/AuthenticateCommand.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/AuthenticateCommand.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/AuthenticateCommand.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/AuthenticateCommand.java Mon Jun 25 17:09:31 2012
@@ -31,7 +31,7 @@ public class AuthenticateCommand extends
   @Override
   public int execute(String fullCommand, CommandLine cl, Shell shellState) throws AccumuloException, AccumuloSecurityException, IOException {
     String user = cl.getArgs()[0];
-    String p = shellState.getReader().readLine("Enter current password for '" + user + "': ", '*');
+    String p = shellState.readMaskedLine("Enter current password for '" + user + "': ", '*');
     if (p == null) {
       shellState.getReader().printNewline();
       return 0;

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/CreateUserCommand.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/CreateUserCommand.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/CreateUserCommand.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/CreateUserCommand.java Mon Jun 25 17:09:31 2012
@@ -46,12 +46,12 @@ public class CreateUserCommand extends C
     String password = null;
     String passwordConfirm = null;
     
-    password = shellState.getReader().readLine("Enter new password for '" + user + "': ", '*');
+    password = shellState.readMaskedLine("Enter new password for '" + user + "': ", '*');
     if (password == null) {
       shellState.getReader().printNewline();
       return 0;
     } // user canceled
-    passwordConfirm = shellState.getReader().readLine("Please confirm new password for '" + user + "': ", '*');
+    passwordConfirm = shellState.readMaskedLine("Please confirm new password for '" + user + "': ", '*');
     if (passwordConfirm == null) {
       shellState.getReader().printNewline();
       return 0;

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DebugCommand.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DebugCommand.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DebugCommand.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DebugCommand.java Mon Jun 25 17:09:31 2012
@@ -39,8 +39,8 @@ public class DebugCommand extends Comman
     } else if (cl.getArgs().length == 0) {
       shellState.getReader().printString(Shell.isDebuggingEnabled() ? "on\n" : "off\n");
     } else {
-      Shell.printException(new IllegalArgumentException("Expected 0 or 1 argument. There were " + cl.getArgs().length + "."));
-      printHelp();
+      shellState.printException(new IllegalArgumentException("Expected 0 or 1 argument. There were " + cl.getArgs().length + "."));
+      printHelp(shellState);
       return 1;
     }
     return 0;

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/HelpCommand.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/HelpCommand.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/HelpCommand.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/HelpCommand.java Mon Jun 25 17:09:31 2012
@@ -89,7 +89,7 @@ public class HelpCommand extends Command
       if (c == null)
         shellState.getReader().printString(String.format("Unknown command \"%s\".  Enter \"help\" for a list possible commands.\n", cmd));
       else
-        c.printHelp(numColumns);
+        c.printHelp(shellState, numColumns);
     }
     return 0;
   }

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/PasswdCommand.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/PasswdCommand.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/PasswdCommand.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/PasswdCommand.java Mon Jun 25 17:09:31 2012
@@ -41,7 +41,7 @@ public class PasswdCommand extends Comma
     String passwordConfirm = null;
     String oldPassword = null;
     
-    oldPassword = shellState.getReader().readLine("Enter current password for '" + currentUser + "': ", '*');
+    oldPassword = shellState.readMaskedLine("Enter current password for '" + currentUser + "': ", '*');
     if (oldPassword == null) {
       shellState.getReader().printNewline();
       return 0;
@@ -50,12 +50,12 @@ public class PasswdCommand extends Comma
     if (!shellState.getConnector().securityOperations().authenticateUser(currentUser, oldPassword.getBytes()))
       throw new AccumuloSecurityException(user, SecurityErrorCode.BAD_CREDENTIALS);
     
-    password = shellState.getReader().readLine("Enter new password for '" + user + "': ", '*');
+    password = shellState.readMaskedLine("Enter new password for '" + user + "': ", '*');
     if (password == null) {
       shellState.getReader().printNewline();
       return 0;
     } // user canceled
-    passwordConfirm = shellState.getReader().readLine("Please confirm new password for '" + user + "': ", '*');
+    passwordConfirm = shellState.readMaskedLine("Please confirm new password for '" + user + "': ", '*');
     if (passwordConfirm == null) {
       shellState.getReader().printNewline();
       return 0;

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetIterCommand.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetIterCommand.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetIterCommand.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/SetIterCommand.java Mon Jun 25 17:09:31 2012
@@ -49,6 +49,7 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionGroup;
 import org.apache.commons.cli.Options;
 
+@SuppressWarnings("deprecation")
 public class SetIterCommand extends Command {
   
   private Option mincScopeOpt, majcScopeOpt, scanScopeOpt, nameOpt, priorityOpt;
@@ -152,11 +153,13 @@ public class SetIterCommand extends Comm
       if (itopts.getNamedOptions() != null) {
         for (Entry<String,String> e : itopts.getNamedOptions().entrySet()) {
           prompt = Shell.repeat("-", 10) + "> set " + shortClassName + " parameter " + e.getKey() + ", " + e.getValue() + ": ";
-          
+          reader.flushConsole();
           input = reader.readLine(prompt);
           if (input == null) {
             reader.printNewline();
             throw new IOException("Input stream closed");
+          } else {
+            input = new String(input);
           }
           // Places all Parameters and Values into the LocalOptions, even if the value is "".
           // This allows us to check for "" values when setting the iterators and allows us to remove
@@ -172,10 +175,13 @@ public class SetIterCommand extends Comm
           while (true) {
             prompt = Shell.repeat("-", 10) + "> set " + shortClassName + " option (<name> <value>, hit enter to skip): ";
             
+            reader.flushConsole();
             input = reader.readLine(prompt);
             if (input == null) {
               reader.printNewline();
               throw new IOException("Input stream closed");
+            } else {
+              input = new String(input);
             }
             
             if (input.length() == 0)

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/TraceCommand.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/TraceCommand.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/TraceCommand.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/TraceCommand.java Mon Jun 25 17:09:31 2012
@@ -32,7 +32,6 @@ import org.apache.accumulo.core.util.she
 import org.apache.commons.cli.CommandLine;
 import org.apache.hadoop.io.Text;
 
-
 public class TraceCommand extends DebugCommand {
   
   public int execute(String fullCommand, CommandLine cl, final Shell shellState) throws IOException {
@@ -66,7 +65,7 @@ public class TraceCommand extends DebugC
                 break;
               }
             } catch (Exception ex) {
-              Shell.printException(ex);
+              shellState.printException(ex);
             }
             shellState.getReader().printString("Waiting for trace information\n");
             shellState.getReader().flushConsole();
@@ -80,8 +79,8 @@ public class TraceCommand extends DebugC
     } else if (cl.getArgs().length == 0) {
       shellState.getReader().printString(Trace.isTracing() ? "on\n" : "off\n");
     } else {
-      Shell.printException(new IllegalArgumentException("Expected 0 or 1 argument. There were " + cl.getArgs().length + "."));
-      printHelp();
+      shellState.printException(new IllegalArgumentException("Expected 0 or 1 argument. There were " + cl.getArgs().length + "."));
+      printHelp(shellState);
       return 1;
     }
     return 0;

Modified: accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/UserCommand.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/UserCommand.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/UserCommand.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/java/org/apache/accumulo/core/util/shell/commands/UserCommand.java Mon Jun 25 17:09:31 2012
@@ -38,7 +38,7 @@ public class UserCommand extends Command
     // We can't let the wrapping try around the execute method deal
     // with the exceptions because we have to do something if one
     // of these methods fails
-    String p = shellState.getReader().readLine("Enter password for user " + user + ": ", '*');
+    String p = shellState.readMaskedLine("Enter password for user " + user + ": ", '*');
     if (p == null) {
       shellState.getReader().printNewline();
       return 0;

Modified: accumulo/branches/ACCUMULO-259/core/src/main/thrift/master.thrift
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/thrift/master.thrift?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/thrift/master.thrift (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/thrift/master.thrift Mon Jun 25 17:09:31 2012
@@ -42,16 +42,9 @@ struct TableInfo {
 }
 
 struct RecoveryStatus {
-    1:string host
     2:string name
-    3:double mapProgress
-    4:double reduceProgress
     5:i32 runtime                   // in millis
-    6:double copyProgress
-}
-
-struct LoggerStatus {
-    1:string logger
+    6:double progress
 }
 
 struct TabletServerStatus {
@@ -61,11 +54,11 @@ struct TabletServerStatus {
     5:double osLoad
     7:i64 holdTime
     8:i64 lookups
-    9:set<string> loggers
     10:i64 indexCacheHits    
     11:i64 indexCacheRequest   
     12:i64 dataCacheHits   
-    13:i64 dataCacheRequest   
+    13:i64 dataCacheRequest
+    14:list<RecoveryStatus> logSorts
 }
 
 enum MasterState {
@@ -94,14 +87,11 @@ struct MasterMonitorInfo {
     1:map<string, TableInfo> tableMap
     2:list<TabletServerStatus> tServerInfo
     3:map<string, byte> badTServers
-    4:list<RecoveryStatus> recovery
-    5:list<LoggerStatus> loggers
     6:MasterState state
     8:MasterGoalState goalState
     7:i32 unassignedTablets
     9:set<string> serversShuttingDown
     10:list<DeadServer> deadTabletServers
-    11:list<DeadServer> deadLoggers
 }
 
 struct TabletSplit {

Modified: accumulo/branches/ACCUMULO-259/core/src/main/thrift/tabletserver.thrift
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/main/thrift/tabletserver.thrift?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/main/thrift/tabletserver.thrift (original)
+++ accumulo/branches/ACCUMULO-259/core/src/main/thrift/tabletserver.thrift Mon Jun 25 17:09:31 2012
@@ -148,8 +148,6 @@ service TabletClientService extends clie
   oneway void chop(1:cloudtrace.TInfo tinfo, 2:security.AuthInfo credentials, 3:string lock, 4:data.TKeyExtent extent),
   oneway void compact(1:cloudtrace.TInfo tinfo, 2:security.AuthInfo credentials, 3:string lock, 4:string tableId, 5:binary startRow, 6:binary endRow),
   
-  oneway void useLoggers(3:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials, 2:set<string> loggers),
-  
   master.TabletServerStatus getTabletServerStatus(3:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials) throws (1:security.ThriftSecurityException sec)
   list<TabletStats> getTabletStats(3:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials, 2:string tableId) throws (1:security.ThriftSecurityException sec)
   TabletStats getHistoricalStats(2:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials) throws (1:security.ThriftSecurityException sec)
@@ -157,57 +155,13 @@ service TabletClientService extends clie
   oneway void fastHalt(3:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials, 2:string lock);
   
   list<ActiveScan> getActiveScans(2:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials) throws (1:security.ThriftSecurityException sec)
+  oneway void removeLogs(1:cloudtrace.TInfo tinfo, 2:security.AuthInfo credentials, 3:list<string> filenames)
 }
 
-// LogID should be cryptographically unguessable
-typedef i64 LogID
 typedef i32 TabletID
 
-exception NoSuchLogIDException {
-}
-
-exception LoggerClosedException {
-}
-
-struct LogFile {
-        1:string name,
-        2:LogID  id,
-}
-
 struct TabletMutations {
 	1:TabletID tabletID,
 	2:i64 seq,
 	3:list<data.TMutation> mutations
 }
-
-struct LogCopyInfo {
-	1:i64 fileSize,
-	2:string loggerZNode
-}
-
-service MutationLogger {
-    LogFile create(3:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials, 2:string tserverSession) throws (1:security.ThriftSecurityException sec, 2:LoggerClosedException lce),
-
-    // Note that these methods correspond to org.apache.accumulo.server.tabletserver.log.TabletLog
-    void defineTablet(5:cloudtrace.TInfo tinfo, 1:LogID id, 2:i64 seq, 3:TabletID tid, 4:data.TKeyExtent tablet) throws (1:NoSuchLogIDException nsli, 2:LoggerClosedException lce),
-    void log(5:cloudtrace.TInfo tinfo, 1:LogID id, 2:i64 seq, 3:TabletID tid, 4:data.TMutation mutation) throws (1:NoSuchLogIDException nsli, 2:LoggerClosedException lce),
-    void logManyTablets(3:cloudtrace.TInfo tinfo, 1:LogID id, 2:list<TabletMutations> mutations) throws (1:NoSuchLogIDException nsli, 2:LoggerClosedException lce),
-    void minorCompactionStarted(5:cloudtrace.TInfo tinfo, 1:LogID id, 2:i64 seq, 3:TabletID tid, 4:string fqfn) throws (1:NoSuchLogIDException nsli, 2:LoggerClosedException lce),
-    void minorCompactionFinished(5:cloudtrace.TInfo tinfo, 1:LogID id, 2:i64 seq, 3:TabletID tid, 4:string fqfn) throws (1:NoSuchLogIDException nsli, 2:LoggerClosedException lce),
-    void close(2:cloudtrace.TInfo tinfo, 1:LogID id) throws (1:NoSuchLogIDException nsli, 2:LoggerClosedException lce),
-
-    // close a log file and initiate the push to HDFS
-    LogCopyInfo startCopy(4:cloudtrace.TInfo tinfo,
-                  1:security.AuthInfo credentials,
-                  2:string name, 
-                  3:string fullyQualifiedFileName,
-                  5:bool sort) throws (1:security.ThriftSecurityException sec),
-
-    // Support log garbage collection              
-    list<string> getClosedLogs(2:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials) throws (1:security.ThriftSecurityException sec),
-    oneway void remove(3:cloudtrace.TInfo tinfo, 1:security.AuthInfo credentials, 2:list<string> files),
-    
-    // Shutdown
-    void beginShutdown(1:cloudtrace.TInfo tinfo, 2:security.AuthInfo credentials) throws (1:security.ThriftSecurityException sec);
-    oneway void halt(1:cloudtrace.TInfo tinfo, 2:security.AuthInfo credentials);
-}

Modified: accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/util/shell/ShellTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/util/shell/ShellTest.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/util/shell/ShellTest.java (original)
+++ accumulo/branches/ACCUMULO-259/core/src/test/java/org/apache/accumulo/core/util/shell/ShellTest.java Mon Jun 25 17:09:31 2012
@@ -32,7 +32,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 public class ShellTest {
-  class TestOutputStream extends OutputStream {
+  static class TestOutputStream extends OutputStream {
     StringBuilder sb = new StringBuilder();
     
     @Override
@@ -62,7 +62,7 @@ public class ShellTest {
     if (expectGoodExit)
       assertGoodExit("", true);
     else
-      assertBadExit();
+      assertBadExit("", true);
   }
   
   void exec(String cmd, boolean expectGoodExit, String expectString) throws IOException {
@@ -74,7 +74,7 @@ public class ShellTest {
     if (expectGoodExit)
       assertGoodExit(expectString, stringPresent);
     else
-      assertBadExit();
+      assertBadExit(expectString, stringPresent);
   }
   
   @Before
@@ -82,6 +82,7 @@ public class ShellTest {
     Shell.log.setLevel(Level.OFF);
     output = new TestOutputStream();
     shell = new Shell(new ConsoleReader(new FileInputStream(FileDescriptor.in), new OutputStreamWriter(output)));
+    shell.setLogErrorsToConsole();
     shell.config("--fake", "-p", "pass");
   }
   
@@ -92,9 +93,11 @@ public class ShellTest {
       assertEquals(s + " present in " + output.get() + " was not " + stringPresent, stringPresent, output.get().contains(s));
   }
   
-  void assertBadExit() {
+  void assertBadExit(String s, boolean stringPresent) {
     Shell.log.debug(output.get());
     assertTrue(shell.getExitCode() > 0);
+    if (s.length() > 0)
+      assertEquals(s + " present in " + output.get() + " was not " + stringPresent, stringPresent, output.get().contains(s));
     shell.resetExitCode();
   }
   
@@ -103,13 +106,13 @@ public class ShellTest {
     Shell.log.debug("Starting about test -----------------------------------");
     exec("about", true, "Shell - Apache Accumulo Interactive Shell");
     exec("about -v", true, "Current user:");
-    exec("about arg", false);
+    exec("about arg", false, "java.lang.IllegalArgumentException: Expected 0 arguments");
   }
   
   @Test
   public void addGetSplitsTest() throws IOException {
     Shell.log.debug("Starting addGetSplits test ----------------------------");
-    exec("addsplits arg", false);
+    exec("addsplits arg", false, "java.lang.IllegalStateException: Not in a table context");
     exec("createtable test", true);
     exec("addsplits 1 \\x80", true);
     exec("getsplits", true, "1\n\\x80");
@@ -119,8 +122,8 @@ public class ShellTest {
   @Test
   public void insertDeleteScanTest() throws IOException {
     Shell.log.debug("Starting insertDeleteScan test ------------------------");
-    exec("insert r f q v", false);
-    exec("delete r f q", false);
+    exec("insert r f q v", false, "java.lang.IllegalStateException: Not in a table context");
+    exec("delete r f q", false, "java.lang.IllegalStateException: Not in a table context");
     exec("createtable test", true);
     exec("insert r f q v", true);
     exec("scan", true, "r f:q []    v");

Modified: accumulo/branches/ACCUMULO-259/docs/examples/README
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/docs/examples/README?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/docs/examples/README (original)
+++ accumulo/branches/ACCUMULO-259/docs/examples/README Mon Jun 25 17:09:31 2012
@@ -62,7 +62,10 @@ features of Apache Accumulo.
 
    README.isolation:   Using the isolated scanner to ensure partial changes 
                        are not seen.
- 
+
+   README.mapred:      Using MapReduce to read from and write to Accumulo 
+                       tables.
+
    README.maxmutation: Limiting mutation size to avoid running out of memory.
 
    README.shard:       Using the intersecting iterator with a term index 

Modified: accumulo/branches/ACCUMULO-259/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java (original)
+++ accumulo/branches/ACCUMULO-259/examples/wikisearch/ingest/src/main/java/org/apache/accumulo/examples/wikisearch/ingest/WikipediaPartitionedIngester.java Mon Jun 25 17:09:31 2012
@@ -39,7 +39,6 @@ import org.apache.accumulo.core.client.m
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.iterators.user.SummingCombiner;
-import org.apache.accumulo.core.tabletserver.thrift.MutationLogger.log_args;
 import org.apache.accumulo.examples.wikisearch.ingest.ArticleExtractor.Article;
 import org.apache.accumulo.examples.wikisearch.iterator.GlobalIndexUidCombiner;
 import org.apache.accumulo.examples.wikisearch.iterator.TextIndexCombiner;

Modified: accumulo/branches/ACCUMULO-259/pom.xml
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/pom.xml?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/pom.xml (original)
+++ accumulo/branches/ACCUMULO-259/pom.xml Mon Jun 25 17:09:31 2012
@@ -498,7 +498,7 @@
       <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-core</artifactId>
-        <version>0.20.203.0</version>
+        <version>0.20.205.0</version>
         <scope>provided</scope>
       </dependency>
       <dependency>

Propchange: accumulo/branches/ACCUMULO-259/server/
------------------------------------------------------------------------------
  Reverse-merged /incubator/accumulo/trunk/src/server:r1178656-1201898
  Merged /accumulo/branches/1.3/src/server:r1349971
  Merged /accumulo/branches/1.4/server:r1341135-1342418,1342420-1343942,1349972,1351425
  Merged /accumulo/branches/1.4/src/server:r1342421-1343896,1343899-1343942,1349972,1351425
  Merged /accumulo/trunk/src/server:r1350779
  Merged /accumulo/trunk/server:r1343822-1353583

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java Mon Jun 25 17:09:31 2012
@@ -57,6 +57,7 @@ import org.apache.accumulo.core.security
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.LoggingRunnable;
+import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.accumulo.core.util.StopWatch;
 import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
@@ -138,7 +139,7 @@ public class BulkImporter {
       final Map<Path,List<TabletLocation>> assignments = Collections.synchronizedSortedMap(new TreeMap<Path,List<TabletLocation>>());
       
       timer.start(Timers.EXAMINE_MAP_FILES);
-      ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
+      ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("findOverlapping"));
       
       for (Path path : paths) {
         final Path mapFile = path;
@@ -376,7 +377,7 @@ public class BulkImporter {
     
     final Map<Path,List<AssignmentInfo>> ais = Collections.synchronizedMap(new TreeMap<Path,List<AssignmentInfo>>());
     
-    ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
+    ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("estimateSizes"));
     
     for (final Entry<Path,List<TabletLocation>> entry : assignments.entrySet()) {
       if (entry.getValue().size() == 1) {
@@ -586,12 +587,11 @@ public class BulkImporter {
       apt.put(entry.getKey(), entry.getValue());
     }
     
-    ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
+    ExecutorService threadPool = Executors.newFixedThreadPool(numThreads, new NamingThreadFactory("submit"));
     
     for (Entry<String,Map<KeyExtent,List<PathSize>>> entry : assignmentsPerTabletServer.entrySet()) {
       String location = entry.getKey();
-      threadPool
-          .submit(new TraceRunnable(new LoggingRunnable(log, new AssignmentTask(credentials, assignmentFailures, tableName, location, entry.getValue()))));
+      threadPool.submit(new AssignmentTask(credentials, assignmentFailures, tableName, location, entry.getValue()));
     }
     
     threadPool.shutdown();

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java Mon Jun 25 17:09:31 2012
@@ -17,53 +17,49 @@
 package org.apache.accumulo.server.gc;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.UUID;
 
 import org.apache.accumulo.cloudtrace.instrument.Span;
 import org.apache.accumulo.cloudtrace.instrument.Trace;
 import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.gc.thrift.GCStatus;
 import org.apache.accumulo.core.gc.thrift.GcCycleStats;
-import org.apache.accumulo.core.tabletserver.thrift.MutationLogger;
-import org.apache.accumulo.core.tabletserver.thrift.MutationLogger.Iface;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
 import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.server.ServerConstants;
-import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.security.SecurityConstants;
+import org.apache.accumulo.server.util.AddressUtil;
 import org.apache.accumulo.server.util.MetadataTable;
 import org.apache.accumulo.server.util.MetadataTable.LogEntry;
-import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransportException;
 import org.apache.zookeeper.KeeperException;
 
 
 public class GarbageCollectWriteAheadLogs {
   private static final Logger log = Logger.getLogger(GarbageCollectWriteAheadLogs.class);
   
-  private final AccumuloConfiguration conf;
+  private final Instance instance;
   private final FileSystem fs;
   
-  GarbageCollectWriteAheadLogs(FileSystem fs, AccumuloConfiguration conf) {
+  GarbageCollectWriteAheadLogs(Instance instance, FileSystem fs) {
+    this.instance = instance;
     this.fs = fs;
-    this.conf = conf;
   }
 
   public void collect(GCStatus status) {
@@ -111,59 +107,51 @@ public class GarbageCollectWriteAheadLog
     }
   }
   
+  boolean holdsLock(InetSocketAddress addr) {
+    try {
+      String zpath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + org.apache.accumulo.core.util.AddressUtil.toString(addr);
+      List<String> children = ZooReaderWriter.getInstance().getChildren(zpath);
+      return !(children == null || children.isEmpty());
+    } catch (KeeperException.NoNodeException ex) {
+      return false;
+    } catch (Exception ex) {
+      log.debug(ex, ex);
+      return true;
+    }
+  }
+
   private int removeFiles(Map<String,ArrayList<String>> serverToFileMap, final GCStatus status) {
-    final AtomicInteger count = new AtomicInteger();
-    ExecutorService threadPool = java.util.concurrent.Executors.newCachedThreadPool();
-    
-    for (final Entry<String,ArrayList<String>> serverFiles : serverToFileMap.entrySet()) {
-      final String server = serverFiles.getKey();
-      final List<String> files = serverFiles.getValue();
-      threadPool.submit(new Runnable() {
-        @Override
-        public void run() {
+    AccumuloConfiguration conf = instance.getConfiguration();
+    for (Entry<String,ArrayList<String>> entry : serverToFileMap.entrySet()) {
+      if (entry.getKey().length() == 0) {
+        // old-style log entry, just remove it
+        for (String filename : entry.getValue()) {
+          log.debug("Removing old-style WAL " + entry.getValue());
           try {
-            Iface logger = ThriftUtil.getClient(new MutationLogger.Client.Factory(), server, Property.LOGGER_PORT, Property.TSERV_LOGGER_TIMEOUT, conf);
-            try {
-              count.addAndGet(files.size());
-              log.debug(String.format("removing %d files from %s", files.size(), server));
-              if (files.size() > 0) {
-                log.debug("deleting files on logger " + server);
-                for (String file : files) {
-                  log.debug("Deleting " + file);
-                }
-                logger.remove(null, SecurityConstants.getSystemCredentials(), files);
-                synchronized (status.currentLog) {
-                  status.currentLog.deleted += files.size();
-                }
-              }
-            } finally {
-              ThriftUtil.returnClient(logger);
-            }
-            log.info(String.format("Removed %d files from %s", files.size(), server));
-            for (String file : files) {
-              try {
-                for (FileStatus match : fs.globStatus(new Path(ServerConstants.getRecoveryDir(), file + "*"))) {
-                  fs.delete(match.getPath(), true);
-                }
-              } catch (IOException ex) {
-                log.warn("Error deleting recovery data: ", ex);
-              }
-            }
-          } catch (TTransportException err) {
-            log.info("Ignoring communication error talking to logger " + serverFiles.getKey() + " (probably a timeout)");
-          } catch (TException err) {
-            log.info("Ignoring exception talking to logger " + serverFiles.getKey() + "(" + err + ")");
+            fs.delete(new Path(Constants.getWalDirectory(conf), filename), true);
+          } catch (IOException ex) {
+            log.error("Unable to delete wal " + filename + ": " + ex);
           }
         }
-      });
-      
+      } else {
+        InetSocketAddress address = AddressUtil.parseAddress(entry.getKey(), Property.TSERV_CLIENTPORT);
+        if (!holdsLock(address))
+          continue;
+        Iface tserver = null;
+        try {
+          tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+          tserver.removeLogs(null, SecurityConstants.getSystemCredentials(), entry.getValue());
+          log.debug("deleted " + entry.getValue() + " from " + entry.getKey());
+          status.currentLog.deleted += entry.getValue().size();
+        } catch (TException e) {
+          log.warn("Error talking to " + address + ": " + e);
+        } finally {
+          if (tserver != null)
+            ThriftUtil.returnClient(tserver);
+        }
+      }
     }
-    threadPool.shutdown();
-    while (!threadPool.isShutdown())
-      try {
-        threadPool.awaitTermination(1, TimeUnit.SECONDS);
-      } catch (InterruptedException e) {}
-    return count.get();
+    return 0;
   }
   
   private static Map<String,ArrayList<String>> mapServersToFiles(Map<String,String> fileToServerMap) {
@@ -194,31 +182,41 @@ public class GarbageCollectWriteAheadLog
   }
   
   private int scanServers(Map<String,String> fileToServerMap) throws Exception {
-    int count = 0;
-    IZooReaderWriter zk = ZooReaderWriter.getInstance();
-    String loggersDir = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZLOGGERS;
-    List<String> servers = zk.getChildren(loggersDir, null);
-    Collections.shuffle(servers);
-    for (String server : servers) {
-      String address = "no-data";
-      count++;
-      try {
-        byte[] data = zk.getData(loggersDir + "/" + server, null);
-        address = new String(data);
-        Iface logger = ThriftUtil.getClient(new MutationLogger.Client.Factory(), address, Property.LOGGER_PORT, Property.TSERV_LOGGER_TIMEOUT, conf);
-        for (String log : logger.getClosedLogs(null, SecurityConstants.getSystemCredentials())) {
-          fileToServerMap.put(log, address);
+    AccumuloConfiguration conf = instance.getConfiguration();
+    Path walRoot = new Path(Constants.getWalDirectory(conf));
+    for (FileStatus status : fs.listStatus(walRoot)) {
+      String name = status.getPath().getName();
+      if (status.isDir()) {
+        for (FileStatus file : fs.listStatus(new Path(walRoot, name))) {
+          if (isUUID(file.getPath().getName()))
+            fileToServerMap.put(file.getPath().getName(), name);
+          else {
+            log.info("Ignoring file " + file.getPath() + " because it doesn't look like a uuid");
+          }
         }
-        ThriftUtil.returnClient(logger);
-      } catch (TException err) {
-        log.warn("Ignoring exception talking to logger " + address);
-      }
-      if (SimpleGarbageCollector.almostOutOfMemory()) {
-        log.warn("Running out of memory collecting write-ahead log file names from loggers, continuing with a partial list");
-        break;
+      } else if (isUUID(name)) {
+        // old-style WAL are not under a directory
+        fileToServerMap.put(name, "");
+      } else {
+        log.info("Ignoring file " + name + " because it doesn't look like a uuid");
       }
     }
+
+    int count = 0;
     return count;
   }
   
+  /**
+   * @param name
+   * @return
+   */
+  static private boolean isUUID(String name) {
+    try {
+      UUID.fromString(name);
+      return true;
+    } catch (IllegalArgumentException ex) {
+      return false;
+    }
+  }
+  
 }

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java Mon Jun 25 17:09:31 2012
@@ -64,6 +64,7 @@ import org.apache.accumulo.core.master.s
 import org.apache.accumulo.core.security.thrift.AuthInfo;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.ColumnFQ;
+import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.accumulo.core.util.ServerServices;
 import org.apache.accumulo.core.util.ServerServices.Service;
 import org.apache.accumulo.core.util.UtilWaitThread;
@@ -297,7 +298,7 @@ public class SimpleGarbageCollector impl
       
       // Clean up any unused write-ahead logs
       Span waLogs = Trace.start("walogs");
-      GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(fs, instance.getConfiguration());
+      GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(instance, fs);
       try {
         log.info("Beginning garbage collection of write-ahead logs");
         walogCollector.collect(status);
@@ -585,7 +586,7 @@ public class SimpleGarbageCollector impl
     
     final BatchWriter finalWriter = writer;
     
-    ExecutorService deleteThreadPool = Executors.newFixedThreadPool(numDeleteThreads);
+    ExecutorService deleteThreadPool = Executors.newFixedThreadPool(numDeleteThreads, new NamingThreadFactory("deleting"));
     
     for (final String delete : confirmedDeletes) {
       

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java Mon Jun 25 17:09:31 2012
@@ -16,7 +16,7 @@
  */
 package org.apache.accumulo.server.logger;
 
-import java.io.FileNotFoundException;
+import java.io.EOFException;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
@@ -36,9 +36,9 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 
 public class LogReader {
@@ -108,28 +108,33 @@ public class LogReader {
       
       if (fs.isFile(path)) {
         // read log entries from a simple hdfs file
-        org.apache.hadoop.io.SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(file), conf);
-        while (reader.next(key, value)) {
+        FSDataInputStream f = fs.open(path);
+        while (true) {
+          try {
+            key.readFields(f);
+            value.readFields(f);
+          } catch (EOFException ex) {
+            break;
+          }
           printLogEvent(key, value, row, rowMatcher, ke, tabletIds, max);
         }
       } else if (local.isFile(path)) {
         // read log entries from a simple file
-        org.apache.hadoop.io.SequenceFile.Reader reader = new SequenceFile.Reader(local, new Path(file), conf);
-        while (reader.next(key, value)) {
+        FSDataInputStream f = fs.open(path);
+        while (true) {
+          try {
+            key.readFields(f);
+            value.readFields(f);
+          } catch (EOFException ex) {
+            break;
+          }
           printLogEvent(key, value, row, rowMatcher, ke, tabletIds, max);
         }
       } else {
-        try {
-          // read the log entries sorted in a map file
-          MultiReader input = new MultiReader(fs, conf, file);
-          while (input.next(key, value)) {
-            printLogEvent(key, value, row, rowMatcher, ke, tabletIds, max);
-          }
-        } catch (FileNotFoundException ex) {
-          SequenceFile.Reader input = new SequenceFile.Reader(local, new Path(file), conf);
-          while (input.next(key, value)) {
-            printLogEvent(key, value, row, rowMatcher, ke, tabletIds, max);
-          }
+        // read the log entries sorted in a map file
+        MultiReader input = new MultiReader(fs, conf, file);
+        while (input.next(key, value)) {
+          printLogEvent(key, value, row, rowMatcher, ke, tabletIds, max);
         }
       }
     }

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java Mon Jun 25 17:09:31 2012
@@ -133,15 +133,6 @@ public class LiveTServerSet implements W
       }
     }
     
-    public void useLoggers(Set<String> loggers) throws TException {
-      TabletClientService.Iface client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
-      try {
-        client.useLoggers(null, SecurityConstants.getSystemCredentials(), loggers);
-      } finally {
-        ThriftUtil.returnClient(client);
-      }
-    }
-    
     public void chop(ZooLock lock, KeyExtent extent) throws TException {
       TabletClientService.Iface client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
       try {

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/master/Master.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/master/Master.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/master/Master.java Mon Jun 25 17:09:31 2012
@@ -69,7 +69,6 @@ import org.apache.accumulo.core.data.thr
 import org.apache.accumulo.core.file.FileUtil;
 import org.apache.accumulo.core.iterators.IteratorUtil;
 import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.core.master.thrift.LoggerStatus;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
 import org.apache.accumulo.core.master.thrift.MasterClientService.Processor;
 import org.apache.accumulo.core.master.thrift.MasterGoalState;
@@ -86,28 +85,20 @@ import org.apache.accumulo.core.util.Byt
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.ColumnFQ;
 import org.apache.accumulo.core.util.Daemon;
-import org.apache.accumulo.core.util.LoggingRunnable;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.core.zookeeper.ZooUtil.NodeMissingPolicy;
 import org.apache.accumulo.server.Accumulo;
-import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.fate.Fate;
 import org.apache.accumulo.server.fate.TStore.TStatus;
 import org.apache.accumulo.server.iterators.MetadataBulkLoadFilter;
-import org.apache.accumulo.server.master.CoordinateRecoveryTask.JobComplete;
-import org.apache.accumulo.server.master.CoordinateRecoveryTask.LogFile;
 import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
-import org.apache.accumulo.server.master.TabletServerLoggers.LoggerWatcher;
 import org.apache.accumulo.server.master.balancer.DefaultLoadBalancer;
-import org.apache.accumulo.server.master.balancer.LoggerBalancer;
-import org.apache.accumulo.server.master.balancer.LoggerUser;
-import org.apache.accumulo.server.master.balancer.SimpleLoggerBalancer;
-import org.apache.accumulo.server.master.balancer.TServerUsesLoggers;
 import org.apache.accumulo.server.master.balancer.TabletBalancer;
+import org.apache.accumulo.server.master.recovery.RecoverLease;
 import org.apache.accumulo.server.master.state.Assignment;
 import org.apache.accumulo.server.master.state.CurrentState;
 import org.apache.accumulo.server.master.state.DeadServerList;
@@ -145,7 +136,6 @@ import org.apache.accumulo.server.securi
 import org.apache.accumulo.server.security.SecurityOperationImpl;
 import org.apache.accumulo.server.security.SecurityUtil;
 import org.apache.accumulo.server.tabletserver.TabletTime;
-import org.apache.accumulo.server.tabletserver.log.RemoteLogger;
 import org.apache.accumulo.server.trace.TraceFileSystem;
 import org.apache.accumulo.server.util.AddressUtil;
 import org.apache.accumulo.server.util.DefaultMap;
@@ -175,17 +165,19 @@ import org.apache.thrift.server.TServer;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 
 
 /**
- * The Master is responsible for assigning and balancing tablets and loggers to tablet servers.
+ * The Master is responsible for assigning and balancing tablets to tablet servers.
  * 
  * The master will also coordinate log recoveries and reports general status.
  */
-public class Master implements LiveTServerSet.Listener, LoggerWatcher, TableObserver, CurrentState, JobComplete {
+public class Master implements LiveTServerSet.Listener, TableObserver, CurrentState {
   
   final private static Logger log = Logger.getLogger(Master.class);
   
@@ -214,8 +206,6 @@ public class Master implements LiveTServ
   
   private ZooLock masterLock = null;
   private TServer clientService = null;
-  private TabletServerLoggers loggers = null;
-  private CoordinateRecoveryTask recovery = null;
   private TabletBalancer tabletBalancer;
   
   private MasterState state = MasterState.INITIAL;
@@ -225,8 +215,8 @@ public class Master implements LiveTServ
   volatile private SortedMap<TServerInstance,TabletServerStatus> tserverStatus = Collections
       .unmodifiableSortedMap(new TreeMap<TServerInstance,TabletServerStatus>());
   
-  private LoggerBalancer loggerBalancer;
-  
+  private Set<String> recoveriesInProgress = Collections.synchronizedSet(new HashSet<String>());
+
   synchronized private MasterState getMasterState() {
     return state;
   }
@@ -512,7 +502,6 @@ public class Master implements LiveTServ
     tserverSet = new LiveTServerSet(instance, config.getConfiguration(), this);
     this.tabletBalancer = createInstanceFromPropertyName(aconf, Property.MASTER_TABLET_BALANCER, TabletBalancer.class, new DefaultLoadBalancer());
     this.tabletBalancer.init(serverConfig);
-    this.loggerBalancer = createInstanceFromPropertyName(aconf, Property.MASTER_LOGGER_BALANCER, LoggerBalancer.class, new SimpleLoggerBalancer());
   }
   
   public TServerConnection getConnection(TServerInstance server) {
@@ -672,11 +661,6 @@ public class Master implements LiveTServ
     @Override
     public MasterMonitorInfo getMasterStats(TInfo info, AuthInfo credentials) throws ThriftSecurityException, TException {
       final MasterMonitorInfo result = new MasterMonitorInfo();
-      result.loggers = new ArrayList<LoggerStatus>();
-      for (String logger : loggers.getLoggersFromZooKeeper().values()) {
-        result.loggers.add(new LoggerStatus(logger));
-      }
-      result.recovery = recovery.status();
       
       result.tServerInfo = new ArrayList<TabletServerStatus>();
       result.tableMap = new DefaultMap<String,TableInfo>(new TableInfo());
@@ -705,8 +689,6 @@ public class Master implements LiveTServ
       }
       DeadServerList obit = new DeadServerList(ZooUtil.getRoot(instance) + Constants.ZDEADTSERVERS);
       result.deadTabletServers = obit.getList();
-      obit = new DeadServerList(ZooUtil.getRoot(instance) + Constants.ZDEADLOGGERS);
-      result.deadLoggers = obit.getList();
       return result;
     }
     
@@ -823,10 +805,6 @@ public class Master implements LiveTServ
         balancer.init(serverConfig);
         tabletBalancer = balancer;
         log.info("tablet balancer changed to " + tabletBalancer.getClass().getName());
-      } else if (property.equals(Property.MASTER_LOGGER_BALANCER.getKey())) {
-        loggerBalancer = createInstanceFromPropertyName(instance.getConfiguration(), Property.MASTER_LOGGER_BALANCER, LoggerBalancer.class,
-            new SimpleLoggerBalancer());
-        log.info("log balancer changed to " + loggerBalancer.getClass().getName());
       }
     }
 
@@ -1064,6 +1042,7 @@ public class Master implements LiveTServ
       fate.delete(opid);
       
     }
+    
   }
   
   public MergeInfo getMergeInfo(Text tableId) {
@@ -1348,9 +1327,8 @@ public class Master implements LiveTServ
             
             if (goal == TabletGoalState.HOSTED) {
               if (state != TabletState.HOSTED && !tls.walogs.isEmpty()) {
-                if (!recovery.recover(SecurityConstants.getSystemCredentials(), tls.extent, tls.walogs, Master.this)) {
+                if (recoverLogs(tls.extent, tls.walogs))
                   continue;
-                }
               }
               switch (state) {
                 case HOSTED:
@@ -1934,7 +1912,6 @@ public class Master implements LiveTServ
       } else if (getMasterGoalState() == MasterGoalState.CLEAN_STOP) {
         log.debug("not balancing because the master is attempting to stop cleanly");
       } else {
-        balanceLoggers();
         return balanceTablets();
       }
       return DEFAULT_WAIT_FOR_WATCHER;
@@ -1967,28 +1944,6 @@ public class Master implements LiveTServ
       }
     }
     
-    private void balanceLoggers() {
-      List<LoggerUser> logUsers = new ArrayList<LoggerUser>();
-      for (Entry<TServerInstance,TabletServerStatus> entry : tserverStatus.entrySet()) {
-        logUsers.add(new TServerUsesLoggers(entry.getKey(), entry.getValue()));
-      }
-      List<String> logNames = new ArrayList<String>(loggers.getLoggersFromZooKeeper().values());
-      Map<LoggerUser,List<String>> assignmentsOut = new HashMap<LoggerUser,List<String>>();
-      int loggersPerServer = getSystemConfiguration().getCount(Property.TSERV_LOGGER_COUNT);
-      loggerBalancer.balance(logUsers, logNames, assignmentsOut, loggersPerServer);
-      for (Entry<LoggerUser,List<String>> entry : assignmentsOut.entrySet()) {
-        TServerUsesLoggers tserver = (TServerUsesLoggers) entry.getKey();
-        try {
-          log.debug("Telling " + tserver.getInstance() + " to use loggers " + entry.getValue());
-          TServerConnection connection = tserverSet.getConnection(tserver.getInstance());
-          if (connection != null)
-            connection.useLoggers(new HashSet<String>(entry.getValue()));
-        } catch (Exception ex) {
-          log.warn("Unable to talk to " + tserver.getInstance(), ex);
-        }
-      }
-    }
-    
     private long balanceTablets() {
       List<TabletMigration> migrationsOut = new ArrayList<TabletMigration>();
       Set<KeyExtent> migrationsCopy = new HashSet<KeyExtent>();
@@ -2023,7 +1978,7 @@ public class Master implements LiveTServ
         result.put(server, status);
         // TODO maybe remove from bad servers
       } catch (Exception ex) {
-        log.error("unable to get tablet server status " + server + " " + ex.getMessage());
+        log.error("unable to get tablet server status " + server + " " + ex.toString());
         log.debug("unable to get tablet server status " + server, ex);
         if (badServers.get(server).incrementAndGet() > MAX_BAD_STATUS_COUNT) {
           log.warn("attempting to stop " + server);
@@ -2048,6 +2003,31 @@ public class Master implements LiveTServ
     return result;
   }
   
+  public boolean recoverLogs(KeyExtent extent, Collection<Collection<String>> walogs) throws IOException {
+    boolean recoveryNeeded = false;
+    for (Collection<String> logs : walogs) {
+      for (String log : logs) {
+        String parts[] = log.split("/");
+        String host = parts[0];
+        String filename = parts[1];
+        if (fs.exists(new Path(Constants.getRecoveryDir(getSystemConfiguration()) + "/" + filename + "/finished"))) {
+          recoveriesInProgress.remove(filename);
+          continue;
+        }
+        recoveryNeeded = true;
+        synchronized (recoveriesInProgress) {
+          if (!recoveriesInProgress.contains(filename)) {
+            Master.log.info("Starting recovery of " + filename + " created for " + host + ", tablet " + extent + " holds a reference");
+            long tid = fate.startTransaction();
+            fate.seedTransaction(tid, new RecoverLease(host, filename), true);
+            recoveriesInProgress.add(filename);
+          }
+        }
+      }
+    }
+    return recoveryNeeded;
+  }
+
   public void run() throws IOException, InterruptedException, KeeperException {
     final String zroot = ZooUtil.getRoot(instance);
     
@@ -2055,13 +2035,6 @@ public class Master implements LiveTServ
     
     TableManager.getInstance().addObserver(this);
     
-    recovery = new CoordinateRecoveryTask(fs, getSystemConfiguration());
-    Thread recoveryThread = new Daemon(new LoggingRunnable(log, recovery), "Recovery Status");
-    recoveryThread.start();
-    
-    loggers = new TabletServerLoggers(this, getSystemConfiguration());
-    loggers.scanZooKeeperForUpdates();
-    
     StatusThread statusThread = new StatusThread();
     statusThread.start();
     
@@ -2069,6 +2042,19 @@ public class Master implements LiveTServ
     migrationCleanupThread.start();
     
     tserverSet.startListeningForTabletServerChanges();
+
+    ZooReaderWriter.getInstance().getChildren(zroot + Constants.ZRECOVERY, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        nextEvent.event("Noticed recovery changes", event.getType());
+        try {
+          // watcher only fires once, add it back
+          ZooReaderWriter.getInstance().getChildren(zroot + Constants.ZRECOVERY, this);
+        } catch (Exception e) {
+          log.error("Failed to add log recovery watcher back", e);
+        }
+      }
+    });
     
     AuthInfo systemAuths = SecurityConstants.getSystemCredentials();
     final TabletStateStore stores[] = {new ZooTabletStateStore(new ZooStore(zroot)), new RootTabletStateStore(instance, systemAuths, this),
@@ -2104,9 +2090,6 @@ public class Master implements LiveTServ
     final long deadline = System.currentTimeMillis() + MAX_CLEANUP_WAIT_TIME;
     statusThread.join(remaining(deadline));
     
-    recovery.stop();
-    recoveryThread.join(remaining(deadline));
-    
     // quit, even if the tablet servers somehow jam up and the watchers
     // don't stop
     for (TabletGroupWatcher watcher : watchers) {
@@ -2132,7 +2115,8 @@ public class Master implements LiveTServ
     };
     long current = System.currentTimeMillis();
     final long waitTime = getSystemConfiguration().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
-    final String masterClientAddress = hostname + ":" + getSystemConfiguration().getPort(Property.MASTER_CLIENTPORT);
+    final String masterClientAddress = org.apache.accumulo.core.util.AddressUtil.toString(new InetSocketAddress(hostname, getSystemConfiguration().getPort(
+        Property.MASTER_CLIENTPORT)));
     
     boolean locked = false;
     while (System.currentTimeMillis() - current < waitTime) {
@@ -2173,42 +2157,9 @@ public class Master implements LiveTServ
     }
   }
   
-  @Override
-  public void newLogger(String address) {
-    try {
-      RemoteLogger remote = new RemoteLogger(address, getSystemConfiguration());
-      for (String onDisk : remote.getClosedLogs()) {
-        Path path = new Path(ServerConstants.getRecoveryDir(), onDisk + ".failed");
-        if (fs.exists(path)) {
-          fs.delete(path, true);
-        }
-      }
-    } catch (Exception ex) {
-      log.warn("Unexpected error clearing failed recovery markers for new logger", ex);
-    }
-    DeadServerList obit = new DeadServerList(ZooUtil.getRoot(instance) + Constants.ZDEADLOGGERS);
-    obit.delete(address);
-    nextEvent.event("Added logger %s", address);
-  }
-  
   static final String I_DONT_KNOW_WHY = "unexpected failure";
   
   @Override
-  public void deadLogger(String address) {
-    DeadServerList obit = new DeadServerList(ZooUtil.getRoot(instance) + Constants.ZDEADLOGGERS);
-    InetSocketAddress parseAddress = AddressUtil.parseAddress(address, Property.LOGGER_PORT);
-    String cause = I_DONT_KNOW_WHY;
-    for (TServerInstance server : serversToShutdown) {
-      if (server.getLocation().getHostName().equals(parseAddress.getHostName())) {
-        cause = "clean shutdown";
-        break;
-      }
-    }
-    obit.post(address, cause);
-    log.info("Noticed logger went away: " + address);
-  }
-  
-  @Override
   public void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance> added) {
     DeadServerList obit = new DeadServerList(ZooUtil.getRoot(instance) + Constants.ZDEADTSERVERS);
     if (added.size() > 0) {
@@ -2283,10 +2234,6 @@ public class Master implements LiveTServ
     return tserverSet.getCurrentServers();
   }
   
-  public Map<String,String> getLoggers() {
-    return loggers.getLoggersFromZooKeeper();
-  }
-  
   @Override
   public Collection<MergeInfo> merges() {
     List<MergeInfo> result = new ArrayList<MergeInfo>();
@@ -2296,11 +2243,6 @@ public class Master implements LiveTServ
     return result;
   }
   
-  @Override
-  public void finished(LogFile entry) {
-    nextEvent.event("Log recovery %s complete ", entry);
-  }
-  
   public void killTServer(TServerInstance server) {
     nextEvent.event("Forcing server down %s", server);
     serversToShutdown.add(server);
@@ -2332,4 +2274,7 @@ public class Master implements LiveTServ
     return this.fs;
   }
 
+  public void updateRecoveryInProgress(String file) {
+    recoveriesInProgress.add(file);
+  }
 }

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java Mon Jun 25 17:09:31 2012
@@ -27,10 +27,7 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.cloudtrace.instrument.TraceExecutorService;
 import org.apache.accumulo.core.Constants;
@@ -49,8 +46,8 @@ import org.apache.accumulo.core.file.Fil
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.security.thrift.AuthInfo;
 import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.client.HdfsZooInstance;
@@ -367,17 +364,8 @@ class LoadFiles extends MasterRepo {
   
   synchronized void initializeThreadPool(Master master) {
     if (threadPool == null) {
-      int THREAD_POOL_SIZE = master.getSystemConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE);
-      ThreadFactory threadFactory = new ThreadFactory() {
-        int count = 0;
-        
-        @Override
-        public Thread newThread(Runnable r) {
-          return new Daemon(r, "bulk loader " + count++);
-        }
-      };
-      ThreadPoolExecutor pool = new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE, 1l, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
-          threadFactory);
+      int threadPoolSize = master.getSystemConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE);
+      ThreadPoolExecutor pool = new SimpleThreadPool(threadPoolSize, "bulk import");
       pool.allowCoreThreadTimeOut(true);
       threadPool = new TraceExecutorService(pool);
     }

Modified: accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/master/tserverOps/ShutdownTServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/master/tserverOps/ShutdownTServer.java?rev=1353663&r1=1353662&r2=1353663&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/master/tserverOps/ShutdownTServer.java (original)
+++ accumulo/branches/ACCUMULO-259/server/src/main/java/org/apache/accumulo/server/master/tserverOps/ShutdownTServer.java Mon Jun 25 17:09:31 2012
@@ -22,9 +22,9 @@ import org.apache.accumulo.core.util.Add
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.server.fate.Repo;
-import org.apache.accumulo.server.master.Master;
 import org.apache.accumulo.server.master.EventCoordinator.Listener;
 import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
+import org.apache.accumulo.server.master.Master;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.master.tableOps.MasterRepo;
 import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
@@ -60,7 +60,7 @@ public class ShutdownTServer extends Mas
       path = ZooUtil.getRoot(m.getInstance()) + Constants.ZDEADTSERVERS + "/" + tserver;
       IZooReaderWriter zoo = ZooReaderWriter.getInstance();
       zoo.putPersistentData(path, "forced down".getBytes(), NodeExistsPolicy.OVERWRITE);
-      return new DisconnectLogger(server.getLocation().getAddress().getHostAddress());
+      return null;
     }
     
     // TODO move this to isReady() and drop while loop?
@@ -86,7 +86,7 @@ public class ShutdownTServer extends Mas
       listener.waitForEvents(1000);
     }
     
-    return new DisconnectLogger(server.getLocation().getAddress().getHostAddress());
+    return null;
   }
   
   @Override



Mime
View raw message