accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject svn commit: r1458882 - in /accumulo/trunk: core/src/main/java/org/apache/accumulo/core/util/shell/ core/src/main/java/org/apache/accumulo/core/util/shell/commands/ fate/src/main/java/org/apache/accumulo/fate/ server/src/main/java/org/apache/accumulo/se...
Date Wed, 20 Mar 2013 15:20:19 GMT
Author: kturner
Date: Wed Mar 20 15:20:18 2013
New Revision: 1458882

URL: http://svn.apache.org/r1458882
Log:
ACCUMULO-842 applied patch from Damon Brown

Added:
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/FateCommand.java
Modified:
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fate/Admin.java

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java?rev=1458882&r1=1458881&r2=1458882&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/Shell.java Wed Mar
20 15:20:18 2013
@@ -93,6 +93,7 @@ import org.apache.accumulo.core.util.she
 import org.apache.accumulo.core.util.shell.commands.ExecfileCommand;
 import org.apache.accumulo.core.util.shell.commands.ExitCommand;
 import org.apache.accumulo.core.util.shell.commands.ExportTableCommand;
+import org.apache.accumulo.core.util.shell.commands.FateCommand;
 import org.apache.accumulo.core.util.shell.commands.FlushCommand;
 import org.apache.accumulo.core.util.shell.commands.FormatterCommand;
 import org.apache.accumulo.core.util.shell.commands.GetAuthsCommand;
@@ -351,8 +352,8 @@ public class Shell extends ShellOptions 
     Command[] otherCommands = {new HiddenCommand()};
     Command[] permissionsCommands = {new GrantCommand(), new RevokeCommand(), new SystemPermissionsCommand(),
new TablePermissionsCommand(),
         new UserPermissionsCommand()};
-    Command[] stateCommands = {new AuthenticateCommand(), new ClsCommand(), new ClearCommand(),
new NoTableCommand(), new SleepCommand(), new TableCommand(),
-        new UserCommand(), new WhoAmICommand()};
+    Command[] stateCommands = {new AuthenticateCommand(), new ClsCommand(), new ClearCommand(),
new FateCommand(), new NoTableCommand(), new SleepCommand(),
+        new TableCommand(), new UserCommand(), new WhoAmICommand()};
     Command[] tableCommands = {new CloneTableCommand(), new ConfigCommand(), new CreateTableCommand(),
new DeleteTableCommand(), new DropTableCommand(),
         new DUCommand(), new ExportTableCommand(), new ImportTableCommand(), new OfflineCommand(),
new OnlineCommand(), new RenameTableCommand(),
         new TablesCommand()};
@@ -412,6 +413,10 @@ public class Shell extends ShellOptions 
     return connector;
   }
   
+  public Instance getInstance() {
+    return instance;
+  }
+  
   public static void main(String args[]) throws IOException {
     Shell shell = new Shell();
     shell.config(args);
@@ -444,8 +449,12 @@ public class Shell extends ShellOptions 
     
     if (execFile != null) {
       java.util.Scanner scanner = new java.util.Scanner(new File(execFile));
-      while (scanner.hasNextLine())
-        execCommand(scanner.nextLine(), true, isVerbose());
+      try {
+        while (scanner.hasNextLine())
+          execCommand(scanner.nextLine(), true, isVerbose());
+      } finally {
+        scanner.close();
+      }
     } else if (execCommand != null) {
       for (String command : execCommand.split("\n")) {
         execCommand(command, true, isVerbose());
@@ -515,6 +524,7 @@ public class Shell extends ShellOptions 
     return connector.whoami() + "@" + connector.getInstance().getInstanceName() + (getTableName().isEmpty()
? "" : " ") + getTableName() + "> ";
   }
   
+  @SuppressWarnings("deprecation")
   public void execCommand(String input, boolean ignoreAuthTimeout, boolean echoPrompt) throws
IOException {
     audit.log(AuditLevel.AUDIT, getDefaultPrompt() + input);
     if (echoPrompt) {
@@ -819,7 +829,7 @@ public class Shell extends ShellOptions 
       writer.close();
     }
   };
-  
+
   public final void printLines(Iterator<String> lines, boolean paginate) throws IOException
{
     printLines(lines, paginate, null);
   }

Added: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/FateCommand.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/FateCommand.java?rev=1458882&view=auto
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/FateCommand.java
(added)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/FateCommand.java
Wed Mar 20 15:20:18 2013
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util.shell.commands;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Formatter;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.shell.Shell;
+import org.apache.accumulo.core.util.shell.Shell.Command;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.AdminUtil;
+import org.apache.accumulo.fate.TStore.TStatus;
+import org.apache.accumulo.fate.ZooStore;
+import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Manage FATE transactions
+ * 
+ */
+public class FateCommand extends Command {
+  
+  private static final String SCHEME = "digest";
+  
+  private static final String USER = "accumulo";
+  
+  private Option secretOption;
+  private Option statusOption;
+  
+  @Override
+  public int execute(final String fullCommand, final CommandLine cl, final Shell shellState)
throws ParseException, KeeperException, InterruptedException,
+      IOException {
+    Instance instance = shellState.getInstance();
+    String[] args = cl.getArgs();
+    if (args.length <= 0) {
+      throw new ParseException("Must provide a command to execute");
+    }
+    String cmd = args[0];
+    boolean failedCommand = false;
+    
+    AdminUtil<FateCommand> admin = new AdminUtil<FateCommand>(false);
+    
+    String path = ZooUtil.getRoot(instance) + Constants.ZFATE;
+    String masterPath = ZooUtil.getRoot(instance) + Constants.ZMASTER_LOCK;
+    IZooReaderWriter zk = getZooReaderWriter(cl.getOptionValue('s'));
+    ZooStore<FateCommand> zs = new ZooStore<FateCommand>(path, zk);
+    
+    if ("fail".equals(cmd)) {
+      if (args.length <= 1) {
+        throw new ParseException("Must provide transaction ID");
+      }
+      for (int i = 1; i < args.length; i++) {
+        if (!admin.prepFail(zs, zk, masterPath, args[i])) {
+          System.out.printf("Could not fail transaction: %s%n", args[i]);
+          failedCommand = true;
+        }
+      }
+    } else if ("delete".equals(cmd)) {
+      if (args.length <= 1) {
+        throw new ParseException("Must provide transaction ID");
+      }
+      for (int i = 1; i < args.length; i++) {
+        if (admin.prepDelete(zs, zk, masterPath, args[i])) {
+          admin.deleteLocks(zs, zk, ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS, args[i]);
+        } else {
+          System.out.printf("Could not delete transaction: %s%n", args[i]);
+          failedCommand = true;
+        }
+      }
+    } else if ("list".equals(cmd) || "print".equals(cmd)) {
+      // Parse transaction ID filters for print display
+      Set<Long> filterTxid = null;
+      if (args.length >= 2) {
+        filterTxid = new HashSet<Long>(args.length);
+        for (int i = 1; i < args.length; i++) {
+          try {
+            Long val = Long.parseLong(args[i], 16);
+            filterTxid.add(val);
+          } catch (NumberFormatException nfe) {
+            // Failed to parse, will exit instead of displaying everything since the intention
was to potentially filter some data
+            System.out.printf("Invalid transaction ID format: %s%n", args[i]);
+            return 1;
+          }
+        }
+      }
+      
+      // Parse TStatus filters for print display
+      EnumSet<TStatus> filterStatus = null;
+      if (cl.hasOption('t')) {
+        filterStatus = EnumSet.noneOf(TStatus.class);
+        String[] tstat = cl.getOptionValues('t');
+        for (int i = 0; i < tstat.length; i++) {
+          try {
+            filterStatus.add(TStatus.valueOf(tstat[i]));
+          } catch (IllegalArgumentException iae) {
+            System.out.printf("Invalid transaction status name: %s%n", tstat[i]);
+            return 1;
+          }
+        }
+      }
+      
+      StringBuilder buf = new StringBuilder(8096);
+      Formatter fmt = new Formatter(buf);
+      admin.print(zs, zk, ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS, fmt, filterTxid,
filterStatus);
+      shellState.printLines(Collections.singletonList(buf.toString()).iterator(), true);
+    } else {
+      throw new ParseException("Invalid command option");
+    }
+    
+    return failedCommand ? 1 : 0;
+  }
+  
+  @SuppressWarnings("deprecation")
+  protected synchronized IZooReaderWriter getZooReaderWriter(String secret) {
+    AccumuloConfiguration conf = AccumuloConfiguration.getSiteConfiguration();
+    if (secret == null) {
+      secret = conf.get(Property.INSTANCE_SECRET);
+    }
+    return new ZooReaderWriter(conf.get(Property.INSTANCE_ZK_HOST), (int) conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT),
SCHEME,
+        (USER + ":" + secret).getBytes());
+  }
+  
+  @Override
+  public String description() {
+    return "manage FATE transactions";
+  }
+  
+  @Override
+  public String usage() {
+    return getName() + " fail <txid>... | delete <txid>... | print [<txid>...]";
+  }
+  
+  @Override
+  public Options getOptions() {
+    final Options o = new Options();
+    secretOption = new Option("s", "secret", true, "specify the instance secret to use");
+    secretOption.setOptionalArg(false);
+    o.addOption(secretOption);
+    statusOption = new Option("t", "status-type", true,
+        "filter 'print' on the transaction status type(s) {NEW, IN_PROGRESS, FAILED_IN_PROGRESS,
FAILED, SUCCESSFUL}");
+    statusOption.setArgs(Option.UNLIMITED_VALUES);
+    statusOption.setOptionalArg(false);
+    o.addOption(statusOption);
+    return o;
+  }
+  
+  @Override
+  public int numArgs() {
+    // Arg length varies between 1 to n
+    return -1;
+  }
+}

Modified: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java?rev=1458882&r1=1458881&r2=1458882&view=diff
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java (original)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/AdminUtil.java Wed Mar 20 15:20:18
2013
@@ -18,10 +18,13 @@ package org.apache.accumulo.fate;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Formatter;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.accumulo.fate.TStore.TStatus;
 import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
@@ -33,7 +36,33 @@ import org.apache.zookeeper.KeeperExcept
  * A utility to administer FATE operations
  */
 public class AdminUtil<T> {
+  
+  private boolean exitOnError = false;
+  
+  /**
+   * Default constructor
+   */
+  public AdminUtil() {
+    this(true);
+  }
+  
+  /**
+   * Constructor
+   * 
+   * @param exitOnError
+   *          <code>System.exit(1)</code> on error if true
+   */
+  public AdminUtil(boolean exitOnError) {
+    super();
+    this.exitOnError = exitOnError;
+  }
+  
   public void print(ZooStore<T> zs, IZooReaderWriter zk, String lockPath) throws KeeperException,
InterruptedException {
+    print(zs, zk, lockPath, new Formatter(System.out), null, null);
+  }
+  
+  public void print(ZooStore<T> zs, IZooReaderWriter zk, String lockPath, Formatter
fmt, Set<Long> filterTxid, EnumSet<TStatus> filterStatus)
+      throws KeeperException, InterruptedException {
     Map<Long,List<String>> heldLocks = new HashMap<Long,List<String>>();
     Map<Long,List<String>> waitingLocks = new HashMap<Long,List<String>>();
     
@@ -84,12 +113,13 @@ public class AdminUtil<T> {
         
       } catch (Exception e) {
         e.printStackTrace();
-        System.err.println("Failed to read locks for " + id + " continuing");
+        fmt.format("Failed to read locks for %s continuing", id);
       }
     }
     
     List<Long> transactions = zs.list();
     
+    long txCount = 0;
     for (Long tid : transactions) {
       
       zs.reserve(tid);
@@ -114,37 +144,99 @@ public class AdminUtil<T> {
       
       zs.unreserve(tid, 0);
       
-      System.out.printf("txid: %016x  status: %-18s  op: %-15s  locked: %-15s locking: %-15s
top: %s%n", tid, status, debug, hlocks, wlocks, top);
+      if ((filterTxid != null && !filterTxid.contains(tid)) || (filterStatus != null
&& !filterStatus.contains(status)))
+        continue;
+      
+      ++txCount;
+      fmt.format("txid: %016x  status: %-18s  op: %-15s  locked: %-15s locking: %-15s top:
%s%n", tid, status, debug, hlocks, wlocks, top);
     }
+    fmt.format(" %s transactions", txCount);
     
     if (heldLocks.size() != 0 || waitingLocks.size() != 0) {
-      System.out.println();
-      System.out.println("The following locks did not have an associated FATE operation");
-      System.out.println();
+      fmt.format("%nThe following locks did not have an associated FATE operation%n");
       for (Entry<Long,List<String>> entry : heldLocks.entrySet())
-        System.out.printf("txid: %016x  locked: %s%n", entry.getKey(), entry.getValue());
+        fmt.format("txid: %016x  locked: %s%n", entry.getKey(), entry.getValue());
       
       for (Entry<Long,List<String>> entry : waitingLocks.entrySet())
-        System.out.printf("txid: %016x  locking: %s%n", entry.getKey(), entry.getValue());
+        fmt.format("txid: %016x  locking: %s%n", entry.getKey(), entry.getValue());
     }
   }
   
-  public void prepDelete(ZooStore<T> zs, IZooReaderWriter zk, String path, String txidStr)
{
-    checkGlobalLock(zk, path);
+  public boolean prepDelete(ZooStore<T> zs, IZooReaderWriter zk, String path, String
txidStr) {
+    if (!checkGlobalLock(zk, path)) {
+      return false;
+    }
     
-    long txid = Long.parseLong(txidStr, 16);
+    long txid;
+    try {
+      txid = Long.parseLong(txidStr, 16);
+    } catch (NumberFormatException nfe) {
+      System.out.printf("Invalid transaction ID format: %s%n", txidStr);
+      return false;
+    }
+    boolean state = false;
     zs.reserve(txid);
-    zs.delete(txid);
+    TStatus ts = zs.getStatus(txid);
+    switch (ts) {
+      case UNKNOWN:
+        System.out.printf("Invalid transaction ID: %016x%n", txid);
+        break;
+      
+      case IN_PROGRESS:
+      case NEW:
+      case FAILED:
+      case FAILED_IN_PROGRESS:
+      case SUCCESSFUL:
+        System.out.printf("Deleting transaction: %016x (%s)%n", txid, ts);
+        zs.delete(txid);
+        state = true;
+        break;
+    }
+    
     zs.unreserve(txid, 0);
+    return state;
   }
   
-  public void prepFail(ZooStore<T> zs, IZooReaderWriter zk, String path, String txidStr)
{
-    checkGlobalLock(zk, path);
+  public boolean prepFail(ZooStore<T> zs, IZooReaderWriter zk, String path, String
txidStr) {
+    if (!checkGlobalLock(zk, path)) {
+      return false;
+    }
     
-    long txid = Long.parseLong(txidStr, 16);
+    long txid;
+    try {
+      txid = Long.parseLong(txidStr, 16);
+    } catch (NumberFormatException nfe) {
+      System.out.printf("Invalid transaction ID format: %s%n", txidStr);
+      return false;
+    }
+    boolean state = false;
     zs.reserve(txid);
-    zs.setStatus(txid, TStatus.FAILED_IN_PROGRESS);
+    TStatus ts = zs.getStatus(txid);
+    switch (ts) {
+      case UNKNOWN:
+        System.out.printf("Invalid transaction ID: %016x%n", txid);
+        break;
+      
+      case IN_PROGRESS:
+      case NEW:
+        System.out.printf("Failing transaction: %016x (%s)%n", txid, ts);
+        zs.setStatus(txid, TStatus.FAILED_IN_PROGRESS);
+        state = true;
+        break;
+      
+      case SUCCESSFUL:
+        System.out.printf("Transaction already completed: %016x (%s)%n", txid, ts);
+        break;
+      
+      case FAILED:
+      case FAILED_IN_PROGRESS:
+        System.out.printf("Transaction already failed: %016x (%s)%n", txid, ts);
+        state = true;
+        break;
+    }
+    
     zs.unreserve(txid, 0);
+    return state;
   }
   
   public void deleteLocks(ZooStore<T> zs, IZooReaderWriter zk, String path, String
txidStr) throws KeeperException, InterruptedException {
@@ -163,18 +255,28 @@ public class AdminUtil<T> {
     }
   }
   
-  public void checkGlobalLock(IZooReaderWriter zk, String path) {
+  public boolean checkGlobalLock(IZooReaderWriter zk, String path) {
     try {
       if (ZooLock.getLockData(zk.getZooKeeper(), path) != null) {
         System.err.println("ERROR: Master lock is held, not running");
-        System.exit(-1);
+        if (this.exitOnError)
+          System.exit(1);
+        else
+          return false;
       }
     } catch (KeeperException e) {
       System.err.println("ERROR: Could not read master lock, not running " + e.getMessage());
-      System.exit(-1);
+      if (this.exitOnError)
+        System.exit(1);
+      else
+        return false;
     } catch (InterruptedException e) {
       System.err.println("ERROR: Could not read master lock, not running" + e.getMessage());
-      System.exit(-1);
+      if (this.exitOnError)
+        System.exit(1);
+      else
+        return false;
     }
+    return true;
   }
 }

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fate/Admin.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fate/Admin.java?rev=1458882&r1=1458881&r2=1458882&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fate/Admin.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fate/Admin.java Wed Mar
20 15:20:18 2013
@@ -34,28 +34,24 @@ import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
 
-
 /**
  * A utility to administer FATE operations
  */
 public class Admin {
   
   static class TxOpts {
-    @Parameter(description="<txid>", required=true)
+    @Parameter(description = "<txid>", required = true)
     List<String> args = new ArrayList<String>();
   }
   
-  @Parameters(commandDescription="Stop an existing FATE by transaction id")
-  static class FailOpts extends TxOpts {
-  }
+  @Parameters(commandDescription = "Stop an existing FATE by transaction id")
+  static class FailOpts extends TxOpts {}
   
-  @Parameters(commandDescription="Delete an existing FATE by transaction id")
-  static class DeleteOpts extends TxOpts {
-  }
+  @Parameters(commandDescription = "Delete an existing FATE by transaction id")
+  static class DeleteOpts extends TxOpts {}
   
-  @Parameters(commandDescription="List the existing FATE transactions")
-  static class PrintOpts {
-  }
+  @Parameters(commandDescription = "List the existing FATE transactions")
+  static class PrintOpts {}
   
   public static void main(String[] args) throws Exception {
     Help opts = new Help();
@@ -67,9 +63,11 @@ public class Admin {
     jc.parse(args);
     if (opts.help || jc.getParsedCommand() == null) {
       jc.usage();
-      System.exit(-1);
+      System.exit(1);
     }
     
+    System.err.printf("This tool has been deprecated%nFATE administration now available within
'accumulo shell'%n$ fate fail <txid>... | delete <txid>... | print [<txid>...]%n%n");
+    
     AdminUtil<Master> admin = new AdminUtil<Master>();
     
     Instance instance = HdfsZooInstance.getInstance();



Mime
View raw message