accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject git commit: ACCUMULO-1650 improved a few admin commands and also made them easier to find and run
Date Thu, 05 Sep 2013 02:00:22 GMT
Updated Branches:
  refs/heads/master 0cf2ff72d -> 53a29837d


ACCUMULO-1650 improved a few admin commands and also made them easier to find and run


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/53a29837
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/53a29837
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/53a29837

Branch: refs/heads/master
Commit: 53a29837df77d8cc57f7973770865ccf0354a4fc
Parents: 0cf2ff7
Author: Keith Turner <kturner@apache.org>
Authored: Wed Sep 4 21:38:59 2013 -0400
Committer: Keith Turner <kturner@apache.org>
Committed: Wed Sep 4 21:38:59 2013 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/server/util/Admin.java  |  91 +++++++++-
 .../server/util/FindOfflineTablets.java         |  74 +++++++-
 .../accumulo/server/util/ListInstances.java     |  53 +++---
 .../util/RemoveEntriesForMissingFiles.java      | 176 ++++++++++++++++---
 4 files changed, 339 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/53a29837/server/src/main/java/org/apache/accumulo/server/util/Admin.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/util/Admin.java b/server/src/main/java/org/apache/accumulo/server/util/Admin.java
index 05452eb..8c797cd 100644
--- a/server/src/main/java/org/apache/accumulo/server/util/Admin.java
+++ b/server/src/main/java/org/apache/accumulo/server/util/Admin.java
@@ -27,6 +27,7 @@ import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.InstanceOperations;
 import org.apache.accumulo.core.client.impl.ClientExec;
 import org.apache.accumulo.core.client.impl.MasterClient;
 import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
@@ -57,24 +58,57 @@ public class Admin {
     List<String> args = new ArrayList<String>();
   }
   
+  @Parameters(commandDescription = "Ping tablet servers.  If no arguments, pings all.")
+  static class PingCommand {
+    @Parameter(description = "{<host> ... }")
+    List<String> args = new ArrayList<String>();
+  }
+  
+  @Parameters(commandDescription = "print tablets that are offline in online tables")
+  static class CheckTabletsCommand {
+    @Parameter(names = "--fixFiles", description = "Remove dangling file pointers")
+    boolean fixFiles = false;
+    
+    @Parameter(names = {"-t", "--table"}, description = "Table to check, if not set checks
all tables")
+    String table = null;
+  }
+
   @Parameters(commandDescription = "stop the master")
   static class StopMasterCommand {}
   
   @Parameters(commandDescription = "stop all the servers")
   static class StopAllCommand {}
   
+  @Parameters(commandDescription = "list Accumulo instances in zookeeper")
+  static class ListInstancesCommand {
+    @Parameter(names = "--print-errors", description = "display errors while listing instances")
+    boolean printErrors = false;
+    @Parameter(names = "--print-all", description = "print information for all instances,
not just those with names")
+    boolean printAll = false;
+  }
+
   public static void main(String[] args) {
     boolean everything;
     
     AdminOpts opts = new AdminOpts();
     JCommander cl = new JCommander(opts);
     cl.setProgramName(Admin.class.getName());
+
+    CheckTabletsCommand checkTabletsCommand = new CheckTabletsCommand();
+    cl.addCommand("checkTablets", checkTabletsCommand);
+
+    ListInstancesCommand listIntancesOpts = new ListInstancesCommand();
+    cl.addCommand("listInstances", listIntancesOpts);
+    
+    PingCommand pingCommand = new PingCommand();
+    cl.addCommand("ping", pingCommand);
+
     StopCommand stopOpts = new StopCommand();
     cl.addCommand("stop", stopOpts);
-    StopMasterCommand stopMasterOpts = new StopMasterCommand();
-    cl.addCommand("stopMaster", stopMasterOpts);
     StopAllCommand stopAllOpts = new StopAllCommand();
     cl.addCommand("stopAll", stopAllOpts);
+    StopMasterCommand stopMasterOpts = new StopMasterCommand();
+    cl.addCommand("stopMaster", stopMasterOpts);
     cl.parse(args);
     
     if (opts.help || cl.getParsedCommand() == null) {
@@ -94,7 +128,27 @@ public class Admin {
         token = opts.getToken();
       }
       
-      if (cl.getParsedCommand().equals("stop")) {
+      int rc = 0;
+
+      if (cl.getParsedCommand().equals("listInstances")) {
+        ListInstances.listInstances(instance.getZooKeepers(), listIntancesOpts.printAll,
listIntancesOpts.printErrors);
+      } else if (cl.getParsedCommand().equals("ping")) {
+        if (ping(instance, principal, token, pingCommand.args) != 0)
+          rc = 4;
+      } else if (cl.getParsedCommand().equals("checkTablets")) {
+        System.out.println("\n*** Looking for offline tablets ***\n");
+        if (FindOfflineTablets.findOffline(instance, new Credentials(principal, token), checkTabletsCommand.table)
!= 0)
+          rc = 5;
+        System.out.println("\n*** Looking for missing files ***\n");
+        if (checkTabletsCommand.table == null) {
+          if (RemoveEntriesForMissingFiles.checkAllTables(instance, principal, token, checkTabletsCommand.fixFiles)
!= 0)
+            rc = 6;
+        } else {
+          if (RemoveEntriesForMissingFiles.checkTable(instance, principal, token, checkTabletsCommand.table,
checkTabletsCommand.fixFiles) != 0)
+            rc = 6;
+        }
+
+      }else if (cl.getParsedCommand().equals("stop")) {
         stopTabletServer(instance, new Credentials(principal, token), stopOpts.args, opts.force);
       } else {
         everything = cl.getParsedCommand().equals("stopAll");
@@ -104,15 +158,46 @@ public class Admin {
         
         stopServer(instance, new Credentials(principal, token), everything);
       }
+      
+      if (rc != 0)
+        System.exit(rc);
     } catch (AccumuloException e) {
       log.error(e, e);
       System.exit(1);
     } catch (AccumuloSecurityException e) {
       log.error(e, e);
       System.exit(2);
+    } catch (Exception e) {
+      log.error(e, e);
+      System.exit(3);
     }
   }
   
+  private static int ping(Instance instance, String principal, AuthenticationToken token,
List<String> args) throws AccumuloException,
+      AccumuloSecurityException {
+    
+    InstanceOperations io = instance.getConnector(principal, token).instanceOperations();
+    
+    if (args.size() == 0) {
+      args = io.getTabletServers();
+    }
+    
+    int unreachable = 0;
+
+    for (String tserver : args) {
+      try {
+        io.ping(tserver);
+        System.out.println(tserver + " OK");
+      } catch (AccumuloException ae) {
+        System.out.println(tserver + " FAILED (" + ae.getMessage() + ")");
+        unreachable++;
+      }
+    }
+    
+    System.out.printf("\n%d of %d tablet servers unreachable\n\n", unreachable, args.size());
+    return unreachable;
+  }
+
   /**
    * flushing during shutdown is a performance optimization, its not required. The method
will make an attempt to initiate flushes of all tables and give up if
    * it takes too long.

http://git-wip-us.apache.org/repos/asf/accumulo/blob/53a29837/server/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java
b/server/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java
index 7441e71..f5dbdd6 100644
--- a/server/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java
+++ b/server/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java
@@ -20,21 +20,30 @@ import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Credentials;
 import org.apache.accumulo.server.cli.ClientOpts;
 import org.apache.accumulo.server.master.LiveTServerSet;
 import org.apache.accumulo.server.master.LiveTServerSet.Listener;
+import org.apache.accumulo.server.master.state.DistributedStoreException;
 import org.apache.accumulo.server.master.state.MetaDataTableScanner;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.master.state.TabletLocationState;
 import org.apache.accumulo.server.master.state.TabletState;
+import org.apache.accumulo.server.master.state.ZooTabletStateStore;
 import org.apache.accumulo.server.master.state.tables.TableManager;
 import org.apache.accumulo.server.security.SystemCredentials;
-import org.apache.commons.collections.iterators.IteratorChain;
+import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 
 public class FindOfflineTablets {
@@ -46,12 +55,16 @@ public class FindOfflineTablets {
   public static void main(String[] args) throws Exception {
     ClientOpts opts = new ClientOpts();
     opts.parseArgs(FindOfflineTablets.class.getName(), args);
-    final AtomicBoolean scanning = new AtomicBoolean(false);
     Instance instance = opts.getInstance();
-    MetaDataTableScanner rootScanner = new MetaDataTableScanner(instance, SystemCredentials.get(),
MetadataSchema.TabletsSection.getRange());
-    MetaDataTableScanner metaScanner = new MetaDataTableScanner(instance, SystemCredentials.get(),
MetadataSchema.TabletsSection.getRange());
-    @SuppressWarnings("unchecked")
-    Iterator<TabletLocationState> scanner = new IteratorChain(rootScanner, metaScanner);
+    SystemCredentials creds = SystemCredentials.get();
+
+    findOffline(instance, creds, null);
+  }
+  
+  static int findOffline(Instance instance, Credentials creds, String tableName) throws AccumuloException,
TableNotFoundException {
+
+    final AtomicBoolean scanning = new AtomicBoolean(false);
+
     LiveTServerSet tservers = new LiveTServerSet(instance, DefaultConfiguration.getDefaultConfiguration(),
new Listener() {
       @Override
       public void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance>
added) {
@@ -63,14 +76,57 @@ public class FindOfflineTablets {
     });
     tservers.startListeningForTabletServerChanges();
     scanning.set(true);
+    
+    Iterator<TabletLocationState> zooScanner;
+    try {
+      zooScanner = new ZooTabletStateStore().iterator();
+    } catch (DistributedStoreException e) {
+      throw new AccumuloException(e);
+    }
+    
+    int offline = 0;
+    
+    System.out.println("Scanning zookeeper");
+    if ((offline = checkTablets(zooScanner, tservers)) > 0)
+      return offline;
+
+    if (RootTable.NAME.equals(tableName))
+      return 0;
+
+    System.out.println("Scanning " + RootTable.NAME);
+    Iterator<TabletLocationState> rootScanner = new MetaDataTableScanner(instance,
creds, MetadataSchema.TabletsSection.getRange(), RootTable.NAME);
+    if ((offline = checkTablets(rootScanner, tservers)) > 0)
+      return offline;
+    
+    if (MetadataTable.NAME.equals(tableName))
+      return 0;
+
+    System.out.println("Scanning " + MetadataTable.NAME);
+    
+    Range range = MetadataSchema.TabletsSection.getRange();
+    if (tableName != null) {
+      String tableId = Tables.getTableId(instance, tableName);
+      range = new KeyExtent(new Text(tableId), null, null).toMetadataRange();
+    }
+    
+    Iterator<TabletLocationState> metaScanner = new MetaDataTableScanner(instance,
creds, range, MetadataTable.NAME);
+    return checkTablets(metaScanner, tservers);
+  }
+  
+  private static int checkTablets(Iterator<TabletLocationState> scanner, LiveTServerSet
tservers) {
+    int offline = 0;
+
     while (scanner.hasNext()) {
       TabletLocationState locationState = scanner.next();
       TabletState state = locationState.getState(tservers.getCurrentServers());
       if (state != null && state != TabletState.HOSTED
-          && TableManager.getInstance().getTableState(locationState.extent.getTableId().toString())
!= TableState.OFFLINE)
-        if (!locationState.extent.equals(RootTable.EXTENT))
-          System.out.println(locationState + " is " + state + "  #walogs:" + locationState.walogs.size());
+          && TableManager.getInstance().getTableState(locationState.extent.getTableId().toString())
!= TableState.OFFLINE) {
+        System.out.println(locationState + " is " + state + "  #walogs:" + locationState.walogs.size());
+        offline++;
+      }
     }
+    
+    return offline;
   }
   
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/53a29837/server/src/main/java/org/apache/accumulo/server/util/ListInstances.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/util/ListInstances.java b/server/src/main/java/org/apache/accumulo/server/util/ListInstances.java
index daab268..b982829 100644
--- a/server/src/main/java/org/apache/accumulo/server/util/ListInstances.java
+++ b/server/src/main/java/org/apache/accumulo/server/util/ListInstances.java
@@ -63,25 +63,36 @@ public class ListInstances {
       opts.keepers = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_ZK_HOST);
     }
     
-    System.out.println("INFO : Using ZooKeepers " + opts.keepers);
-    ZooReader rdr = new ZooReader(opts.keepers, ZOOKEEPER_TIMER_MILLIS); 
-    ZooCache cache = new ZooCache(opts.keepers, ZOOKEEPER_TIMER_MILLIS);
+    String keepers = opts.keepers;
+    boolean printAll = opts.printAll;
+    boolean printErrors = opts.printErrors;
+    
+    listInstances(keepers, printAll, printErrors);
+    
+  }
+
+  static synchronized void listInstances(String keepers, boolean printAll, boolean printErrors)
{
+    errors = 0;
 
-    TreeMap<String,UUID> instanceNames = getInstanceNames(rdr);
+    System.out.println("INFO : Using ZooKeepers " + keepers);
+    ZooReader rdr = new ZooReader(keepers, ZOOKEEPER_TIMER_MILLIS);
+    ZooCache cache = new ZooCache(keepers, ZOOKEEPER_TIMER_MILLIS);
+
+    TreeMap<String,UUID> instanceNames = getInstanceNames(rdr, printErrors);
     
     System.out.println();
     printHeader();
     
     for (Entry<String,UUID> entry : instanceNames.entrySet()) {
-      printInstanceInfo(cache, entry.getKey(), entry.getValue());
+      printInstanceInfo(cache, entry.getKey(), entry.getValue(), printErrors);
     }
     
-    TreeSet<UUID> instancedIds = getInstanceIDs(rdr);
+    TreeSet<UUID> instancedIds = getInstanceIDs(rdr, printErrors);
     instancedIds.removeAll(instanceNames.values());
     
-    if (opts.printAll) {
+    if (printAll) {
       for (UUID uuid : instancedIds) {
-        printInstanceInfo(cache, null, uuid);
+        printInstanceInfo(cache, null, uuid, printErrors);
       }
     } else if (instancedIds.size() > 0) {
       System.out.println();
@@ -90,10 +101,9 @@ public class ListInstances {
       System.out.println();
     }
     
-    if (!opts.printErrors && errors > 0) {
+    if (!printErrors && errors > 0) {
       System.err.println("WARN : There were " + errors + " errors, run with --print-errors
to see more info");
     }
-    
   }
   
   private static class CharFiller implements Formattable {
@@ -122,8 +132,8 @@ public class ListInstances {
     
   }
   
-  private static void printInstanceInfo(ZooCache cache, String instanceName, UUID iid) {
-    String master = getMaster(cache, iid);
+  private static void printInstanceInfo(ZooCache cache, String instanceName, UUID iid, boolean
printErrors) {
+    String master = getMaster(cache, iid, printErrors);
     if (instanceName == null) {
       instanceName = "";
     }
@@ -134,7 +144,8 @@ public class ListInstances {
     
     System.out.printf("%" + NAME_WIDTH + "s |%" + UUID_WIDTH + "s |%" + MASTER_WIDTH + "s%n",
"\"" + instanceName + "\"", iid, master);
   }
-  private static String getMaster(ZooCache cache, UUID iid) {
+  
+  private static String getMaster(ZooCache cache, UUID iid, boolean printErrors) {
     
     if (iid == null) {
       return null;
@@ -148,12 +159,12 @@ public class ListInstances {
       }
       return new String(master);
     } catch (Exception e) {
-      handleException(e);
+      handleException(e, printErrors);
       return null;
     }
   }
   
-  private static TreeMap<String,UUID> getInstanceNames(ZooReader zk) {
+  private static TreeMap<String,UUID> getInstanceNames(ZooReader zk, boolean printErrors)
{
     
     String instancesPath = Constants.ZROOT + Constants.ZINSTANCES;
     
@@ -164,7 +175,7 @@ public class ListInstances {
     try {
       names = zk.getChildren(instancesPath);
     } catch (Exception e) {
-      handleException(e);
+      handleException(e, printErrors);
       return tm;
     }
     
@@ -174,7 +185,7 @@ public class ListInstances {
         UUID iid = UUID.fromString(new String(zk.getData(instanceNamePath, null)));
         tm.put(name, iid);
       } catch (Exception e) {
-        handleException(e);
+        handleException(e, printErrors);
         tm.put(name, null);
       }
     }
@@ -182,7 +193,7 @@ public class ListInstances {
     return tm;
   }
   
-  private static TreeSet<UUID> getInstanceIDs(ZooReader zk) {
+  private static TreeSet<UUID> getInstanceIDs(ZooReader zk, boolean printErrors) {
     TreeSet<UUID> ts = new TreeSet<UUID>();
     
     try {
@@ -198,14 +209,14 @@ public class ListInstances {
         }
       }
     } catch (Exception e) {
-      handleException(e);
+      handleException(e, printErrors);
     }
     
     return ts;
   }
   
-  private static void handleException(Exception e) {
-    if (opts.printErrors) {
+  private static void handleException(Exception e, boolean printErrors) {
+    if (printErrors) {
       e.printStackTrace();
     }
     

http://git-wip-us.apache.org/repos/asf/accumulo/blob/53a29837/server/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java
b/server/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java
index c1795e3..25fcd82 100644
--- a/server/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java
+++ b/server/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java
@@ -16,23 +16,41 @@
  */
 package org.apache.accumulo.server.util;
 
+import java.util.HashSet;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.accumulo.core.cli.BatchWriterOpts;
 import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
 import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.server.cli.ClientOpts;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.commons.collections.map.LRUMap;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 
 import com.beust.jcommander.Parameter;
@@ -49,39 +67,153 @@ public class RemoveEntriesForMissingFiles {
     boolean fix = false;
   }
   
-  public static void main(String[] args) throws Exception {
-    Opts opts = new Opts();
-    ScannerOpts scanOpts = new ScannerOpts();
-    BatchWriterOpts bwOpts = new BatchWriterOpts();
-    opts.parseArgs(RemoveEntriesForMissingFiles.class.getName(), args, scanOpts, bwOpts);
+  private static class CheckFileTask implements Runnable {
+    @SuppressWarnings("rawtypes")
+    private Map cache;
+    private VolumeManager fs;
+    private AtomicInteger missing;
+    private BatchWriter writer;
+    private Key key;
+    private Path path;
+    private Set<Path> processing;
+    private AtomicReference<Exception> exceptionRef;
+
+    @SuppressWarnings({"rawtypes"})
+    CheckFileTask(Map cache, VolumeManager fs, AtomicInteger missing, BatchWriter writer,
Key key, Path map, Set<Path> processing,
+        AtomicReference<Exception> exceptionRef) {
+      this.cache = cache;
+      this.fs = fs;
+      this.missing = missing;
+      this.writer=writer;
+      this.key = key;
+      this.path = map;
+      this.processing = processing;
+      this.exceptionRef = exceptionRef;
+    }
+    
+    @SuppressWarnings("unchecked")
+    public void run() {
+      try {
+        if (!fs.exists(path)) {
+          missing.incrementAndGet();
+
+          Mutation m = new Mutation(key.getRow());
+          m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
+          if (writer != null) {
+            writer.addMutation(m);
+            System.out.println("Reference " + path + " removed from " + key.getRow());
+          } else {
+            System.out.println("File " + path + " is missing");
+          }
+        } else {
+          synchronized (processing) {
+            cache.put(path, path);
+          }
+        }
+      } catch (Exception e) {
+        exceptionRef.compareAndSet(null, e);
+      } finally {
+        synchronized (processing) {
+          processing.remove(path);
+          processing.notify();
+        }
+      }
+    }
+  }
+  
+  
+  private static int checkTable(Instance instance, String principal, AuthenticationToken
token, String table, Range range, boolean fix) throws Exception {
+    
+    @SuppressWarnings({"rawtypes"})
+    Map cache = new LRUMap(100000);
+    Set<Path> processing = new HashSet<Path>();
+    ExecutorService threadPool = Executors.newFixedThreadPool(16);
+    
+    System.out.printf("Scanning : %s %s\n", table, range);
+
     VolumeManager fs = VolumeManagerImpl.get();
-    Connector connector = opts.getConnector();
-    Scanner metadata = connector.createScanner(MetadataTable.NAME, opts.auths);
-    metadata.setBatchSize(scanOpts.scanBatchSize);
-    metadata.setRange(MetadataSchema.TabletsSection.getRange());
+    Connector connector = instance.getConnector(principal, token);
+    Scanner metadata = connector.createScanner(table, Authorizations.EMPTY);
+    metadata.setRange(range);
     metadata.fetchColumnFamily(DataFileColumnFamily.NAME);
     int count = 0;
-    int missing = 0;
+    AtomicInteger missing = new AtomicInteger(0);
+    AtomicReference<Exception> exceptionRef = new AtomicReference<Exception>(null);
     BatchWriter writer = null;
-    if (opts.fix)
-      writer = connector.createBatchWriter(MetadataTable.NAME, bwOpts.getBatchWriterConfig());
+
+    if (fix)
+      writer = connector.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+
+
     for (Entry<Key,Value> entry : metadata) {
+      if (exceptionRef.get() != null)
+        break;
+
       count++;
       Key key = entry.getKey();
       Path map = fs.getFullPath(key);
-      if (!fs.exists(map)) {
-        missing++;
-        log.info("File " + map + " is missing");
-        Mutation m = new Mutation(key.getRow());
-        m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
-        if (writer != null) {
-          writer.addMutation(m);
-          log.info("entry removed from metadata table: " + m);
+      
+      synchronized (processing) {
+        while (processing.size() >= 64 || processing.contains(map))
+          processing.wait();
+
+        if (cache.get(map) != null) {
+          continue;
         }
+
+        processing.add(map);
       }
+      
+      threadPool.submit(new CheckFileTask(cache, fs, missing, writer, key, map, processing,
exceptionRef));
     }
-    if (writer != null && missing > 0)
+    
+    threadPool.shutdown();
+
+    synchronized (processing) {
+      while (processing.size() > 0)
+        processing.wait();
+    }
+    
+    if (exceptionRef.get() != null)
+      throw new AccumuloException(exceptionRef.get());
+
+    if (writer != null && missing.get() > 0)
       writer.close();
-    log.info(String.format("%d files of %d missing", missing, count));
+
+    System.out.printf("Scan finished, %d files of %d missing\n\n", missing.get(), count);
+    
+    return missing.get();
+  }
+
+  
+
+  static int checkAllTables(Instance instance, String principal, AuthenticationToken token,
boolean fix) throws Exception {
+    int missing = checkTable(instance, principal, token, RootTable.NAME, MetadataSchema.TabletsSection.getRange(),
fix);
+    
+    if (missing == 0)
+      return checkTable(instance, principal, token, MetadataTable.NAME, MetadataSchema.TabletsSection.getRange(),
fix);
+    else
+      return missing;
+  }
+  
+  static int checkTable(Instance instance, String principal, AuthenticationToken token, String
tableName, boolean fix) throws Exception {
+    if (tableName.equals(RootTable.NAME)) {
+      throw new IllegalArgumentException("Can not check root table");
+    } else if (tableName.equals(MetadataTable.NAME)) {
+      return checkTable(instance, principal, token, RootTable.NAME, MetadataSchema.TabletsSection.getRange(),
fix);
+    } else {
+      String tableId = Tables.getTableId(instance, tableName);
+      Range range = new KeyExtent(new Text(tableId), null, null).toMetadataRange();
+      return checkTable(instance, principal, token, MetadataTable.NAME, range, fix);
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    Opts opts = new Opts();
+    ScannerOpts scanOpts = new ScannerOpts();
+    BatchWriterOpts bwOpts = new BatchWriterOpts();
+    opts.parseArgs(RemoveEntriesForMissingFiles.class.getName(), args, scanOpts, bwOpts);
+    
+    checkAllTables(opts.getInstance(), opts.principal, opts.getToken(), opts.fix);
   }
 }


Mime
View raw message