accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject svn commit: r1494759 [1/5] - in /accumulo/trunk: core/src/main/java/org/apache/accumulo/core/client/admin/ core/src/main/java/org/apache/accumulo/core/client/impl/ core/src/main/java/org/apache/accumulo/core/client/mock/ core/src/main/java/org/apache/a...
Date Wed, 19 Jun 2013 20:18:32 GMT
Author: ecn
Date: Wed Jun 19 20:18:30 2013
New Revision: 1494759

URL: http://svn.apache.org/r1494759
Log:
ACCUMULO-118 merge to trunk

Added:
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fs/
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fs/FileRef.java   (with props)
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fs/RandomVolumeChooser.java   (with props)
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fs/VolumeChooser.java   (with props)
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java   (with props)
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java   (with props)
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/FileUtil.java   (with props)
Removed:
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/TableDiskUsage.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/CountRowKeys.java
Modified:
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DUCommand.java
    accumulo/trunk/proxy/src/test/java/org/apache/accumulo/proxy/SimpleTest.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/Accumulo.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/ServerConstants.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/SetGoalState.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletIteratorEnvironment.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/CheckForMetadataProblems.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/Initialize.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/TableDiskUsage.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/TabletOperations.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/ZooKeeperMain.java
    accumulo/trunk/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java
    accumulo/trunk/server/src/test/java/org/apache/accumulo/server/tabletserver/log/MultiReaderTest.java
    accumulo/trunk/server/src/test/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecoveryTest.java
    accumulo/trunk/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousStatsCollector.java
    accumulo/trunk/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryTest.java
    accumulo/trunk/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
    accumulo/trunk/test/src/test/java/org/apache/accumulo/test/ShellServerTest.java

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java Wed Jun 19 20:18:30 2013
@@ -1274,7 +1274,6 @@ public class TableOperationsImpl extends
     
     try {
       FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), instance.getConfiguration());
-      ;
       Map<String,String> props = getExportedProps(fs, new Path(importDir, Constants.EXPORT_FILE));
       
       for (String propKey : props.keySet()) {

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java Wed Jun 19 20:18:30 2013
@@ -230,14 +230,24 @@ class OfflineIterator implements Iterato
     
     if (currentExtent != null && !extent.isPreviousExtent(currentExtent))
       throw new AccumuloException(" " + currentExtent + " is not previous extent " + extent);
-    
+
     String tablesDir = instance.getConfiguration().get(Property.INSTANCE_DFS_DIR) + "/tables";
+    String[] volumes = instance.getConfiguration().get(Property.INSTANCE_VOLUMES).split(",");
+    if (volumes.length > 1) {
+      tablesDir = volumes[0] + tablesDir;
+    }
     List<String> absFiles = new ArrayList<String>();
     for (String relPath : relFiles) {
-      if (relPath.startsWith(".."))
-        absFiles.add(tablesDir + relPath.substring(2));
-      else
-        absFiles.add(tablesDir + "/" + tableId + relPath);
+      if (relFiles.contains(":")) {
+        absFiles.add(relPath);
+      } else {
+        // handle old-style relative paths
+        if (relPath.startsWith("..")) {
+          absFiles.add(tablesDir + relPath.substring(2));
+        } else {
+          absFiles.add(tablesDir + "/" + tableId + relPath);
+        }
+      }
     }
     
     iter = createIterator(extent, absFiles);

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java Wed Jun 19 20:18:30 2013
@@ -306,11 +306,7 @@ public class MockTableOperations extends
   public List<DiskUsage> getDiskUsage(Set<String> tables) throws AccumuloException, AccumuloSecurityException {
 
     List<DiskUsage> diskUsages = new ArrayList<DiskUsage>();
-    for(String table : tables) {
-      TreeSet<String> tree = new TreeSet<String>();
-      tree.add(table);
-      diskUsages.add(new DiskUsage(tree, 1l));
-    }
+    diskUsages.add(new DiskUsage(new TreeSet<String>(tables), 0l));
 
     return diskUsages;
   }

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java Wed Jun 19 20:18:30 2013
@@ -79,6 +79,7 @@ public enum Property {
       "A secret unique to a given instance that all servers must know in order to communicate with one another."
           + " Change it before initialization. To change it later use ./bin/accumulo accumulo.server.util.ChangeSecret [oldpasswd] [newpasswd], "
           + " and then update conf/accumulo-site.xml everywhere."),
+  INSTANCE_VOLUMES("instance.volumes", "", PropertyType.STRING, "A list of volumes to use.  By default, this will be the namenode in the hadoop configuration in the accumulo classpath."),
   INSTANCE_SECURITY_AUTHENTICATOR("instance.security.authenticator", "org.apache.accumulo.server.security.handler.ZKAuthenticator", PropertyType.CLASSNAME,
       "The authenticator class that accumulo will use to determine if a user has privilege to perform an action"),
   INSTANCE_SECURITY_AUTHORIZOR("instance.security.authorizor", "org.apache.accumulo.server.security.handler.ZKAuthorizor", PropertyType.CLASSNAME,
@@ -100,6 +101,8 @@ public enum Property {
   GENERAL_KERBEROS_PRINCIPAL("general.kerberos.principal", "", PropertyType.STRING, "Name of the kerberos principal to use. _HOST will automatically be "
       + "replaced by the machines hostname in the hostname portion of the principal. Leave blank if not using kerberoized hdfs"),
   GENERAL_MAX_MESSAGE_SIZE("tserver.server.message.size.max", "1G", PropertyType.MEMORY, "The maximum size of a message that can be sent to a tablet server."),
+  GENERAL_VOLUME_CHOOSER("general.volume.chooser", "org.apache.accumulo.server.fs.RandomVolumeChooser", PropertyType.CLASSNAME, "The class that will be used to select which volume will be used to create new files."),
+
   
   // properties that are specific to master server behavior
   MASTER_PREFIX("master.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the master server"),

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DUCommand.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DUCommand.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DUCommand.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DUCommand.java Wed Jun 19 20:18:30 2013
@@ -22,17 +22,13 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 
 import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.ConfigurationCopy;
-import org.apache.accumulo.core.util.TableDiskUsage;
-import org.apache.accumulo.core.util.TableDiskUsage.Printer;
+import org.apache.accumulo.core.client.admin.DiskUsage;
+import org.apache.accumulo.core.util.NumUtil;
 import org.apache.accumulo.core.util.shell.Shell;
 import org.apache.accumulo.core.util.shell.Shell.Command;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 
 public class DUCommand extends Command {
   
@@ -51,22 +47,17 @@ public class DUCommand extends Command {
         }
       }
     } else {
-      shellState.checkTableState();
-      tablesToFlush.add(shellState.getTableName());
+      if (tablesToFlush.isEmpty()) {
+        shellState.checkTableState();
+        tablesToFlush.add(shellState.getTableName());
+      }
     }
     try {
-      final AccumuloConfiguration acuConf = new ConfigurationCopy(shellState.getConnector().instanceOperations().getSystemConfiguration());
-      TableDiskUsage.printDiskUsage(acuConf, tablesToFlush, FileSystem.get(new Configuration()), shellState.getConnector(), new Printer() {
-        @Override
-        public void print(String line) {
-          try {
-            shellState.getReader().println(line);
-          } catch (IOException ex) {
-            throw new RuntimeException(ex);
-          }
-        }
-        
-      }, prettyPrint);
+      String valueFormat = prettyPrint ? "%9s" : "%,24d";
+      for (DiskUsage usage : shellState.getConnector().tableOperations().getDiskUsage(tablesToFlush)) {
+        Object value = prettyPrint ? NumUtil.bigNumberForSize(usage.getUsage()) : usage.getUsage();
+        shellState.getReader().println(String.format(valueFormat + " %s", value, usage.getTables()));
+      }
     } catch (Exception ex) {
       throw new RuntimeException(ex);
     }

Modified: accumulo/trunk/proxy/src/test/java/org/apache/accumulo/proxy/SimpleTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/proxy/src/test/java/org/apache/accumulo/proxy/SimpleTest.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/proxy/src/test/java/org/apache/accumulo/proxy/SimpleTest.java (original)
+++ accumulo/trunk/proxy/src/test/java/org/apache/accumulo/proxy/SimpleTest.java Wed Jun 19 20:18:30 2013
@@ -982,7 +982,7 @@ public class SimpleTest {
     assertEquals("10", new String(more.getResults().get(0).getValue()));
     try {
       client.checkIteratorConflicts(creds, TABLE_TEST, setting, EnumSet.allOf(IteratorScope.class));
-      fail("checkIteratorConflicts did not throw and exception");
+      fail("checkIteratorConflicts did not throw an exception");
     } catch (Exception ex) {}
     client.deleteRows(creds, TABLE_TEST, null, null);
     client.removeIterator(creds, TABLE_TEST, "test", EnumSet.allOf(IteratorScope.class));

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/Accumulo.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/Accumulo.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/Accumulo.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/Accumulo.java Wed Jun 19 20:18:30 2013
@@ -20,7 +20,6 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Map.Entry;
@@ -33,12 +32,11 @@ import org.apache.accumulo.core.util.Uti
 import org.apache.accumulo.core.util.Version;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.util.time.SimpleTimer;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.log4j.Logger;
 import org.apache.log4j.helpers.LogLog;
 import org.apache.log4j.xml.DOMConfigurator;
@@ -48,21 +46,21 @@ public class Accumulo {
   
   private static final Logger log = Logger.getLogger(Accumulo.class);
   
-  public static synchronized void updateAccumuloVersion(FileSystem fs) {
+  public static synchronized void updateAccumuloVersion(VolumeManager fs) {
     try {
       if (getAccumuloPersistentVersion(fs) == ServerConstants.PREV_DATA_VERSION) {
         fs.create(new Path(ServerConstants.getDataVersionLocation() + "/" + ServerConstants.DATA_VERSION));
-        fs.delete(new Path(ServerConstants.getDataVersionLocation() + "/" + ServerConstants.PREV_DATA_VERSION), false);
+        fs.delete(new Path(ServerConstants.getDataVersionLocation() + "/" + ServerConstants.PREV_DATA_VERSION));
       }
     } catch (IOException e) {
       throw new RuntimeException("Unable to set accumulo version: an error occurred.", e);
     }
   }
   
-  public static synchronized int getAccumuloPersistentVersion(FileSystem fs) {
+  public static synchronized int getAccumuloPersistentVersion(VolumeManager fs) {
     int dataVersion;
     try {
-      FileStatus[] files = fs.listStatus(ServerConstants.getDataVersionLocation());
+      FileStatus[] files = fs.getDefaultVolume().listStatus(ServerConstants.getDataVersionLocation());
       if (files == null || files.length == 0) {
         dataVersion = -1; // assume it is 0.5 or earlier
       } else {
@@ -82,7 +80,7 @@ public class Accumulo {
     }
   }
   
-  public static void init(FileSystem fs, ServerConfiguration config, String application) throws UnknownHostException {
+  public static void init(VolumeManager fs, ServerConfiguration config, String application) throws UnknownHostException {
     
     System.setProperty("org.apache.accumulo.core.application", application);
     
@@ -183,7 +181,7 @@ public class Accumulo {
     return result.getHostName();
   }
   
-  public static void waitForZookeeperAndHdfs(FileSystem fs) {
+  public static void waitForZookeeperAndHdfs(VolumeManager fs) {
     log.info("Attempting to talk to zookeeper");
     while (true) {
       try {
@@ -200,7 +198,7 @@ public class Accumulo {
     long sleep = 1000;
     while (true) {
       try {
-        if (!isInSafeMode(fs))
+        if (fs.isReady())
           break;
         log.warn("Waiting for the NameNode to leave safemode");
       } catch (IOException ex) {
@@ -213,37 +211,4 @@ public class Accumulo {
     log.info("Connected to HDFS");
   }
   
-  private static boolean isInSafeMode(FileSystem fs) throws IOException {
-    if (!(fs instanceof DistributedFileSystem))
-      return false;
-    DistributedFileSystem dfs = (DistributedFileSystem) fs;
-    // So this: if (!dfs.setSafeMode(SafeModeAction.SAFEMODE_GET))
-    // Becomes this:
-    Class<?> safeModeAction;
-    try {
-      // hadoop 2.0
-      safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction");
-    } catch (ClassNotFoundException ex) {
-      // hadoop 1.0
-      try {
-        safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.FSConstants$SafeModeAction");
-      } catch (ClassNotFoundException e) {
-        throw new RuntimeException("Cannot figure out the right class for Constants");
-      }
-    }
-    Object get = null;
-    for (Object obj : safeModeAction.getEnumConstants()) {
-      if (obj.toString().equals("SAFEMODE_GET"))
-        get = obj;
-    }
-    if (get == null) {
-      throw new RuntimeException("cannot find SAFEMODE_GET");
-    }
-    try {
-      Method setSafeMode = dfs.getClass().getMethod("setSafeMode", safeModeAction);
-      return (Boolean) setSafeMode.invoke(dfs, get);
-    } catch (Exception ex) {
-      throw new RuntimeException("cannot find method setSafeMode");
-    }
-  }
 }

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/ServerConstants.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/ServerConstants.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/ServerConstants.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/ServerConstants.java Wed Jun 19 20:18:30 2013
@@ -17,9 +17,11 @@
 package org.apache.accumulo.server;
 
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.MetadataTable;
 import org.apache.accumulo.core.util.RootTable;
 import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
 public class ServerConstants {
@@ -30,40 +32,60 @@ public class ServerConstants {
   public static final int PREV_DATA_VERSION = 4;
   
   // these are functions to delay loading the Accumulo configuration unless we must
-  public static String getBaseDir() {
-    return ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_DIR);
+  public static String[] getBaseDirs() {
+    String singleNamespace = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_DIR);
+    String ns = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_VOLUMES);
+    if (ns == null || ns.isEmpty()) {
+      Configuration hadoopConfig = CachedConfiguration.getInstance();
+      String fullPath = hadoopConfig.get("fs.default.name") + singleNamespace;
+      return new String[] { fullPath };
+    }
+    String namespaces[] = ns.split(",");
+    if (namespaces.length < 2) {
+      Configuration hadoopConfig = CachedConfiguration.getInstance();
+      String fullPath = hadoopConfig.get("fs.default.name") + singleNamespace;
+      return new String[] { fullPath };
+    }
+    return prefix(namespaces, singleNamespace);
   }
   
-  public static String getTablesDir() {
-    return getBaseDir() + "/tables";
+  public static String[] prefix(String bases[], String suffix) {
+    String result[] = new String[bases.length];
+    for (int i = 0; i < bases.length; i++) {
+      result[i] = bases[i] + "/" + suffix;
+    }
+    return result;
   }
   
-  public static String getRecoveryDir() {
-    return getBaseDir() + "/recovery";
+  public static String[] getTablesDirs() {
+    return prefix(getBaseDirs(), "tables");
   }
   
-  public static String getWalDirectory() {
-    return getBaseDir() + "/wal";
+  public static String[] getRecoveryDirs() {
+    return prefix(getBaseDirs(), "recovery");
   }
   
-  public static Path getInstanceIdLocation() {
-    return new Path(getBaseDir() + "/instance_id");
+  public static String[] getWalDirs() {
+    return prefix(getBaseDirs(), "wal");
   }
   
-  public static Path getDataVersionLocation() {
-    return new Path(getBaseDir() + "/version");
+  public static String[] getWalogArchives() {
+    return prefix(getBaseDirs(), "walogArchive");
   }
   
-  public static String getMetadataTableDir() {
-    return getTablesDir() + "/" + MetadataTable.ID;
+  public static Path getInstanceIdLocation() {
+    return new Path(getBaseDirs()[0] + "/instance_id");
   }
   
-  public static String getRootTableDir() {
-    return getTablesDir() + "/" + MetadataTable.ID;
+  public static Path getDataVersionLocation() {
+    return new Path(getBaseDirs()[0] + "/version");
   }
   
-  public static String getRootTabletDir() {
-    return getRootTableDir() + RootTable.ZROOT_TABLET;
+  public static String[] getMetadataTableDirs() {
+    return prefix(getTablesDirs(), MetadataTable.ID);
   }
   
+  public static String getRootTabletDir() {
+    return prefix(getMetadataTableDirs(), RootTable.ZROOT_TABLET)[0];
+  }
 }

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java Wed Jun 19 20:18:30 2013
@@ -52,10 +52,10 @@ import org.apache.accumulo.core.security
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.core.util.TableDiskUsage;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.security.AuditedSecurityOperation;
 import org.apache.accumulo.server.security.SecurityOperation;
+import org.apache.accumulo.server.util.TableDiskUsage;
 import org.apache.accumulo.server.zookeeper.TransactionWatcher;
 import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
 import org.apache.accumulo.trace.thrift.TInfo;
@@ -335,7 +335,7 @@ public class ClientServiceHandler implem
       FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), conf);
       
       // use the same set of tableIds that were validated above to avoid race conditions
-      Map<TreeSet<String>,Long> diskUsage = TableDiskUsage.getDiskUsage(new ServerConfiguration(instance).getConfiguration(), tableIds, fs, conn, false);
+      Map<TreeSet<String>,Long> diskUsage = TableDiskUsage.getDiskUsage(new ServerConfiguration(instance).getConfiguration(), tableIds, fs, conn);
       List<TDiskUsage> retUsages = new ArrayList<TDiskUsage>();
       for (Map.Entry<TreeSet<String>,Long> usageItem : diskUsage.entrySet()) {
         retUsages.add(new TDiskUsage(new ArrayList<String>(usageItem.getKey()), usageItem.getValue()));

Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fs/FileRef.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fs/FileRef.java?rev=1494759&view=auto
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fs/FileRef.java (added)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fs/FileRef.java Wed Jun 19 20:18:30 2013
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.fs;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+
+
+/**
+ * This is a glue object, to convert short file references to long references.
+ * The !METADATA table may contain old relative file references.  This class keeps 
+ * track of the short file reference, so it can be removed properly from the !METADATA table.
+ */
+public class FileRef implements Comparable<FileRef> {
+  String metaReference;  // something like ../2/d-00000/A00001.rf
+  Path fullReference;  // something like hdfs://nn:9001/accumulo/tables/2/d-00000/A00001.rf
+  
+  public FileRef(VolumeManager fs, Key key) {
+    metaReference = key.getColumnQualifier().toString();
+    fullReference = fs.getFullPath(key);
+  }
+  
+  public FileRef(String metaReference, Path fullReference) {
+    this.metaReference = metaReference;
+    this.fullReference = fullReference;
+  }
+  
+  public FileRef(String path) {
+    this.metaReference = path;
+    this.fullReference = new Path(path);
+  }
+  
+  public String toString() {
+    return fullReference.toString();
+  }
+  
+  public Path path() {
+    return fullReference;
+  }
+  
+  public Text meta() {
+    return new Text(metaReference);
+  }
+
+  @Override
+  public int compareTo(FileRef o) {
+    return path().compareTo(o.path());
+  }
+
+  @Override
+  public int hashCode() {
+    return path().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof FileRef) {
+      return compareTo((FileRef)obj) == 0;
+    }
+    return false;
+  }
+  
+  
+}

Propchange: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fs/FileRef.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fs/RandomVolumeChooser.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fs/RandomVolumeChooser.java?rev=1494759&view=auto
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fs/RandomVolumeChooser.java (added)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fs/RandomVolumeChooser.java Wed Jun 19 20:18:30 2013
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.fs;
+
+import java.util.Random;
+
+public class RandomVolumeChooser implements VolumeChooser {
+  Random random = new Random();
+  
+  @Override
+  public String choose(String[] options) {
+    return options[random.nextInt(options.length)];
+  }
+
+}

Propchange: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fs/RandomVolumeChooser.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fs/VolumeChooser.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fs/VolumeChooser.java?rev=1494759&view=auto
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fs/VolumeChooser.java (added)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fs/VolumeChooser.java Wed Jun 19 20:18:30 2013
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.fs;
+
+
+public interface VolumeChooser {
+  String choose(String[] options);
+}

Propchange: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fs/VolumeChooser.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java?rev=1494759&view=auto
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java (added)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java Wed Jun 19 20:18:30 2013
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.fs;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A wrapper around multiple hadoop FileSystem objects, which are assumed to be different volumes.
+ * This also concentrates a bunch of meta-operations like waiting for SAFE_MODE, and closing WALs.
+ */
+public interface VolumeManager {
+  
+  // close the underlying FileSystems
+  void close() throws IOException;
+  
+  // the mechanism by which the master ensures that tablet servers can no longer write to a WAL
+  boolean closePossiblyOpenFile(Path path) throws IOException;
+  
+  // forward to the appropriate FileSystem object
+  FSDataOutputStream create(Path dest) throws IOException;
+  
+  // forward to the appropriate FileSystem object
+  FSDataOutputStream create(Path path, boolean b) throws IOException;
+  
+  // forward to the appropriate FileSystem object
+  FSDataOutputStream create(Path path, boolean b, int int1, short int2, long long1) throws IOException;
+  
+  // create a file, but only if it doesn't exist
+  boolean createNewFile(Path writable) throws IOException;
+  
+  // create a file which can be sync'd to disk
+  FSDataOutputStream createSyncable(Path logPath, int buffersize, short replication, long blockSize) throws IOException;
+  
+  // delete a file
+  boolean delete(Path path) throws IOException;
+  
+  // delete a directory and anything under it
+  boolean deleteRecursively(Path path) throws IOException;
+  
+  // forward to the appropriate FileSystem object
+  boolean exists(Path path) throws IOException;
+  
+  // forward to the appropriate FileSystem object
+  FileStatus getFileStatus(Path path) throws IOException;
+  
+  // find the appropriate FileSystem object given a path
+  FileSystem getFileSystemByPath(Path path);
+  
+  // get a mapping of volume to FileSystem
+  Map<String, ? extends FileSystem> getFileSystems();
+  
+  // return the item in options that is in the same volume as source
+  Path matchingFileSystem(Path source, String[] options);
+  
+  // create a new path in the same volume as the sourceDir
+  String newPathOnSameVolume(String sourceDir, String suffix);
+  
+  // forward to the appropriate FileSystem object
+  FileStatus[] listStatus(Path path) throws IOException;
+  
+  // forward to the appropriate FileSystem object
+  boolean mkdirs(Path directory) throws IOException;
+  
+  // forward to the appropriate FileSystem object
+  FSDataInputStream open(Path path) throws IOException;
+  
+  // forward to the appropriate FileSystem object, throws an exception if the paths are in different volumes
+  boolean rename(Path path, Path newPath) throws IOException;
+  
+  // forward to the appropriate FileSystem object
+  boolean moveToTrash(Path sourcePath) throws IOException;
+  
+  // forward to the appropriate FileSystem object
+  short getDefaultReplication(Path logPath);
+  
+  // forward to the appropriate FileSystem object
+  boolean isFile(Path path) throws IOException;
+  
+  // all volume are ready to provide service (not in SafeMode, for example)
+  boolean isReady() throws IOException;
+  
+  // ambiguous references to files go here
+  FileSystem getDefaultVolume();
+  
+  // forward to the appropriate FileSystem object
+  FileStatus[] globStatus(Path path) throws IOException;
+
+  // Convert a file or directory !METADATA reference into a path
+  Path getFullPath(Key key);
+  
+  // Given a filename, figure out the qualified path given multiple namespaces
+  Path getFullPath(String paths[], String fileName) throws IOException;
+
+  // forward to the appropriate FileSystem object
+  ContentSummary getContentSummary(Path dir) throws IOException;
+
+  // decide on which of the given locations to create a new file
+  String choose(String[] options);
+}

Propchange: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java?rev=1494759&view=auto
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java (added)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java Wed Jun 19 20:18:30 2013
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.fs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Trash;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+
+public class VolumeManagerImpl implements VolumeManager {
+  
+  private static final Logger log = Logger.getLogger(FileSystem.class);
+  
+  Map<String, ? extends FileSystem> volumes;
+  String defaultVolumes;
+  AccumuloConfiguration conf;
+  VolumeChooser chooser;
+  
+  protected VolumeManagerImpl(Map<String, ? extends FileSystem> volumes, String defaultVolume, AccumuloConfiguration conf) {
+    this.volumes = volumes;
+    this.defaultVolumes = defaultVolume;
+    this.conf = conf;
+    ensureSyncIsEnabled();
+    try {
+      chooser = (VolumeChooser)this.getClass().getClassLoader().loadClass(conf.get(Property.GENERAL_VOLUME_CHOOSER)).newInstance();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+  
+  public static org.apache.accumulo.server.fs.VolumeManager getLocal() throws IOException {
+    return new VolumeManagerImpl(Collections.singletonMap("", FileSystem.getLocal(CachedConfiguration.getInstance())), "", DefaultConfiguration.getDefaultConfiguration());
+  }
+
+  @Override
+  public void close() throws IOException {
+    IOException ex = null;
+    for (FileSystem fs : volumes.values()) {
+      try {
+        fs.close();
+      } catch (IOException e) {
+        ex = e;
+      }
+    }
+    if (ex != null) {
+      throw ex;
+    }
+  }
+  
+  @Override
+  public boolean closePossiblyOpenFile(Path path) throws IOException {
+    FileSystem fs = getFileSystemByPath(path);
+    if (fs instanceof DistributedFileSystem) {
+      DistributedFileSystem dfs = (DistributedFileSystem) fs;
+      try {
+        return dfs.recoverLease(path);
+      } catch (FileNotFoundException ex) {
+        throw ex;
+      } 
+    } else if (fs instanceof LocalFileSystem) {
+      // ignore
+    } else {
+      throw new IllegalStateException("Don't know how to recover a lease for " + fs.getClass().getName());
+    }
+    fs.append(path).close();
+    log.info("Recovered lease on " + path.toString() + " using append");
+    return true;
+  }
+  
+  @Override
+  public FSDataOutputStream create(Path path) throws IOException {
+    FileSystem fs = getFileSystemByPath(path);
+    return fs.create(path);
+  }
+  
+  @Override
+  public FSDataOutputStream create(Path path, boolean overwrite) throws IOException {
+    FileSystem fs = getFileSystemByPath(path);
+    return fs.create(path, overwrite);
+  }
+  
+  private static long correctBlockSize(Configuration conf, long blockSize) {
+    if (blockSize <= 0)
+      blockSize = conf.getLong("dfs.block.size", 67108864);
+    
+    int checkSum = conf.getInt("io.bytes.per.checksum", 512);
+    blockSize -= blockSize % checkSum;
+    blockSize = Math.max(blockSize, checkSum);
+    return blockSize;
+  }
+
+  private static int correctBufferSize(Configuration conf, int bufferSize) {
+    if (bufferSize <= 0)
+      bufferSize = conf.getInt("io.file.buffer.size", 4096);
+    return bufferSize;
+  }
+  
+
+  @Override
+  public FSDataOutputStream create(Path path, boolean overwrite, int bufferSize, short replication, long blockSize) throws IOException {
+    FileSystem fs = getFileSystemByPath(path);
+    if (bufferSize == 0) {
+      fs.getConf().getInt("io.file.buffer.size", 4096);
+    }
+    return fs.create(path, overwrite, bufferSize, replication, correctBlockSize(fs.getConf(), blockSize));
+  }
+
+  @Override
+  public boolean createNewFile(Path path) throws IOException {
+    FileSystem fs = getFileSystemByPath(path);
+    return fs.createNewFile(path);
+  }
+  @Override
+  public FSDataOutputStream createSyncable(Path logPath, int bufferSize, short replication, long blockSize) throws IOException {
+    FileSystem fs = getFileSystemByPath(logPath);
+    blockSize = correctBlockSize(fs.getConf(), blockSize);
+    bufferSize = correctBufferSize(fs.getConf(), bufferSize);
+    try {
+      // This... 
+      //    EnumSet<CreateFlag> set = EnumSet.of(CreateFlag.SYNC_BLOCK, CreateFlag.CREATE);
+      //    return fs.create(logPath, FsPermission.getDefault(), set, buffersize, replication, blockSize, null);
+      // Becomes this:
+      Class<?> createFlags = Class.forName("org.apache.hadoop.fs.CreateFlag");
+      List<Enum<?>> flags = new ArrayList<Enum<?>>();
+      if (createFlags.isEnum()) {
+        for (Object constant : createFlags.getEnumConstants()) {
+          if (constant.toString().equals("SYNC_BLOCK")) {
+            flags.add((Enum<?>)constant);
+            log.debug("Found synch enum " + constant);
+          }
+          if (constant.toString().equals("CREATE")) {
+            flags.add((Enum<?>)constant);
+            log.debug("Found CREATE enum " + constant);
+          }
+        }
+      }
+      Object set = EnumSet.class.getMethod("of", java.lang.Enum.class, java.lang.Enum.class).invoke(null, flags.get(0), flags.get(1));
+      log.debug("CreateFlag set: " + set);
+      Method create = fs.getClass().getMethod("create", Path.class, FsPermission.class, EnumSet.class, Integer.TYPE, Short.TYPE, Long.TYPE, Progressable.class);
+      log.debug("creating " + logPath + " with SYNCH_BLOCK flag");
+      return (FSDataOutputStream)create.invoke(fs, logPath, FsPermission.getDefault(), set, bufferSize, replication, blockSize, null);
+    } catch (ClassNotFoundException ex) {
+      // Expected in hadoop 1.0
+      return fs.create(logPath, true, bufferSize, replication, blockSize);
+    } catch (Exception ex) {
+      log.debug(ex, ex);
+      return fs.create(logPath, true, bufferSize, replication, blockSize);
+    }
+  }
+
+  @Override
+  public boolean delete(Path path)  throws IOException{
+    return getFileSystemByPath(path).delete(path, false);
+  }
+
+  @Override
+  public boolean deleteRecursively(Path path) throws IOException {
+    return getFileSystemByPath(path).delete(path, true);
+  }
+
+  private void ensureSyncIsEnabled() {
+    for (FileSystem fs : getFileSystems().values()) {
+      if (fs instanceof DistributedFileSystem) {
+        if (!fs.getConf().getBoolean("dfs.durable.sync", false) && !fs.getConf().getBoolean("dfs.support.append", false)) {
+          String msg = "Must set dfs.durable.sync OR dfs.support.append to true.  Which one needs to be set depends on your version of HDFS.  See ACCUMULO-623. \n"
+              + "HADOOP RELEASE          VERSION           SYNC NAME             DEFAULT\n"
+              + "Apache Hadoop           0.20.205          dfs.support.append    false\n"
+              + "Apache Hadoop            0.23.x           dfs.support.append    true\n"
+              + "Apache Hadoop             1.0.x           dfs.support.append    false\n"
+              + "Apache Hadoop             1.1.x           dfs.durable.sync      true\n"
+              + "Apache Hadoop          2.0.0-2.0.2        dfs.support.append    true\n"
+              + "Cloudera CDH             3u0-3u3             ????               true\n"
+              + "Cloudera CDH               3u4            dfs.support.append    true\n"
+              + "Hortonworks HDP           `1.0            dfs.support.append    false\n"
+              + "Hortonworks HDP           `1.1            dfs.support.append    false";
+          log.fatal(msg);
+          System.exit(-1);
+        }
+        try {
+          // if this class exists
+          Class.forName("org.apache.hadoop.fs.CreateFlag");
+          // we're running hadoop 2.0, 1.1
+          if (!fs.getConf().getBoolean("dfs.datanode.synconclose", false)) {
+            log.warn("dfs.datanode.synconclose set to false: data loss is possible on system reset or power loss");
+          }
+        } catch (ClassNotFoundException ex) {
+          // hadoop 1.0
+        }
+      }
+    }
+    
+  }
+
+  @Override
+  public boolean exists(Path path) throws IOException {
+    return getFileSystemByPath(path).exists(path);
+  }
+
+  @Override
+  public FileStatus getFileStatus(Path path) throws IOException {
+    return getFileSystemByPath(path).getFileStatus(path);
+  }
+
+  @Override
+  public FileSystem getFileSystemByPath(Path path) {
+    if (path.toString().contains(":"))
+    {
+      try {
+        return path.getFileSystem(CachedConfiguration.getInstance());
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+      
+    return volumes.get(defaultVolumes);
+  }
+
+  @Override
+  public Map<String, ? extends FileSystem>  getFileSystems() {
+    return volumes;
+  }
+
+  @Override
+  public FileStatus[] listStatus(Path path) throws IOException {
+    return getFileSystemByPath(path).listStatus(path); 
+  }
+
+  @Override
+  public boolean mkdirs(Path path) throws IOException {
+    return getFileSystemByPath(path).mkdirs(path);
+  }
+
+  @Override
+  public FSDataInputStream open(Path path) throws IOException {
+    return getFileSystemByPath(path).open(path);
+  }
+
+  @Override
+  public boolean rename(Path path, Path newPath)throws IOException {
+    FileSystem source = getFileSystemByPath(path);
+    FileSystem dest = getFileSystemByPath(newPath);
+    if (source != dest) {
+      throw new NotImplementedException("Cannot rename files across volumes: " + path + " -> " + newPath);
+    }
+    return source.rename(path, newPath);
+  }
+
+  @Override
+  public boolean moveToTrash(Path path) throws IOException{
+    FileSystem fs = getFileSystemByPath(path);
+    Trash trash = new Trash(fs, fs.getConf());
+    return trash.moveToTrash(path);
+  }
+
+  @Override
+  public short getDefaultReplication(Path path) {
+    return getFileSystemByPath(path).getDefaultReplication();
+  }
+
+  @Override
+  public boolean isFile(Path path) throws IOException {
+    return getFileSystemByPath(path).isFile(path);
+  }
+
+  public static VolumeManager get() throws IOException {
+    AccumuloConfiguration conf = ServerConfiguration.getSystemConfiguration(HdfsZooInstance.getInstance());
+    return get(conf);
+  }
+  
+  static private final String DEFAULT = "";
+
+  public static VolumeManager get(AccumuloConfiguration conf) throws IOException {
+    Map<String, FileSystem> fileSystems = new HashMap<String, FileSystem>();
+    Configuration hadoopConf = CachedConfiguration.getInstance();
+    fileSystems.put(DEFAULT, FileSystem.get(hadoopConf));
+    String ns = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_VOLUMES);
+    if (ns != null) {
+      for (String space : ns.split(",")) {
+        if (space.contains(":")) {
+          fileSystems.put(space, new Path(space).getFileSystem(hadoopConf));
+        } else {
+          fileSystems.put(space, FileSystem.get(hadoopConf));
+        }
+      }
+    }
+    return new VolumeManagerImpl(fileSystems, "", conf);
+  }
+
+  @Override
+  public boolean isReady() throws IOException {
+    for (FileSystem fs : getFileSystems().values()) {
+      if (!(fs instanceof DistributedFileSystem))
+        continue;
+      DistributedFileSystem dfs = (DistributedFileSystem)fs;
+      // So this: if (!dfs.setSafeMode(SafeModeAction.SAFEMODE_GET))
+      // Becomes this:
+      Class<?> safeModeAction;
+      try {
+        // hadoop 2.0
+        safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction");
+      } catch (ClassNotFoundException ex) {
+        // hadoop 1.0
+        try {
+          safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.FSConstants$SafeModeAction");
+        } catch (ClassNotFoundException e) {
+          throw new RuntimeException("Cannot figure out the right class for Constants");
+        }
+      }
+      Object get = null;
+      for (Object obj : safeModeAction.getEnumConstants()) {
+        if (obj.toString().equals("SAFEMODE_GET"))
+          get = obj;
+      }
+      if (get == null) {
+        throw new RuntimeException("cannot find SAFEMODE_GET");
+      }
+      try {
+        Method setSafeMode = dfs.getClass().getMethod("setSafeMode", safeModeAction);
+        boolean inSafeMode = (Boolean) setSafeMode.invoke(dfs, get);
+        if (inSafeMode) {
+          return false;
+        }
+      } catch (Exception ex) {
+        throw new RuntimeException("cannot find method setSafeMode");
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public FileSystem getDefaultVolume() {
+    return volumes.get(defaultVolumes);
+  }
+
+  @Override
+  public FileStatus[] globStatus(Path pathPattern) throws IOException {
+    return getFileSystemByPath(pathPattern).globStatus(pathPattern);
+  }
+  
+  @Override
+  public Path getFullPath(Key key) {
+    
+    String relPath = key.getColumnQualifierData().toString();
+    if (relPath.contains(":"))
+      return new Path(relPath);
+   
+    byte [] tableId = KeyExtent.tableOfMetadataRow(key.getRow());
+    
+    if (relPath.startsWith("../"))
+      relPath = relPath.substring(2);
+    else
+      relPath = "/" + new String(tableId) + relPath;
+    Path fullPath = new Path(ServerConstants.getTablesDirs()[0] + relPath);
+    FileSystem fs = getFileSystemByPath(fullPath);
+    return fs.makeQualified(fullPath);
+  }
+
+  @Override
+  public Path matchingFileSystem(Path source, String[] options) {
+    for (String fs : getFileSystems().keySet()) {
+      for (String option : options) {
+        if (option.startsWith(fs))
+          return new Path(option);
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public String newPathOnSameVolume(String sourceDir, String suffix) {
+    for (String fs : getFileSystems().keySet()) {
+        if (sourceDir.startsWith(fs)) {
+          return fs + "/" + suffix;
+        }
+    }
+    return null;
+  }
+
+  @Override
+  public Path getFullPath(String[] paths, String fileName) throws IOException {
+    if (fileName.contains(":"))
+      return new Path(fileName);
+    // TODO: ACCUMULO-118
+    // How do we want it to work?  Find it somewhere? or find it in the default file system?
+    // old-style name, on one of many possible "root" paths:
+    if (fileName.startsWith("../"))
+      fileName = fileName.substring(2);
+    for (String path : paths) {
+      String fullPath;
+      if (path.endsWith("/") || fileName.startsWith("/"))
+        fullPath = path + fileName;
+      else
+        fullPath = path + "/" + fileName;
+      Path exists = new Path(fullPath);
+      FileSystem ns = getFileSystemByPath(exists);
+      if (ns.exists(exists)) {
+        Path result = ns.makeQualified(exists);
+        log.debug("Found " + exists + " on " + path + " as " + result);
+        return ns.makeQualified(exists);
+      }
+    }
+    throw new IOException("Could not find file " + fileName + " in " + Arrays.asList(paths));
+  }
+
+  @Override
+  public ContentSummary getContentSummary(Path dir) throws IOException {
+    return getFileSystemByPath(dir).getContentSummary(dir);
+  }
+
+  @Override
+  public String choose(String[] options) {
+    return chooser.choose(options);
+  }
+
+}

Propchange: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java Wed Jun 19 20:18:30 2013
@@ -39,6 +39,7 @@ import org.apache.accumulo.core.tabletse
 import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.security.SecurityConstants;
 import org.apache.accumulo.server.util.AddressUtil;
 import org.apache.accumulo.server.util.MetadataTable;
@@ -48,9 +49,7 @@ import org.apache.accumulo.trace.instrum
 import org.apache.accumulo.trace.instrument.Trace;
 import org.apache.accumulo.trace.instrument.Tracer;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Trash;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 import org.apache.zookeeper.KeeperException;
@@ -59,15 +58,13 @@ public class GarbageCollectWriteAheadLog
   private static final Logger log = Logger.getLogger(GarbageCollectWriteAheadLogs.class);
   
   private final Instance instance;
-  private final FileSystem fs;
+  private final VolumeManager fs;
   
-  private Trash trash;
+  private boolean useTrash;
   
-  GarbageCollectWriteAheadLogs(Instance instance, FileSystem fs, boolean noTrash) throws IOException {
+  GarbageCollectWriteAheadLogs(Instance instance, VolumeManager fs, boolean useTrash) throws IOException {
     this.instance = instance;
     this.fs = fs;
-    if (!noTrash)
-      this.trash = new Trash(fs, fs.getConf());
   }
   
   public void collect(GCStatus status) {
@@ -75,11 +72,11 @@ public class GarbageCollectWriteAheadLog
     Span span = Trace.start("scanServers");
     try {
       
-      Set<String> sortedWALogs = getSortedWALogs();
+      Set<Path> sortedWALogs = getSortedWALogs();
       
       status.currentLog.started = System.currentTimeMillis();
       
-      Map<String,String> fileToServerMap = new HashMap<String,String>();
+      Map<Path,String> fileToServerMap = new HashMap<Path,String>();
       int count = scanServers(fileToServerMap);
       long fileScanStop = System.currentTimeMillis();
       log.info(String.format("Fetched %d files from %d servers in %.2f seconds", fileToServerMap.size(), count,
@@ -101,7 +98,7 @@ public class GarbageCollectWriteAheadLog
       log.info(String.format("%d log entries scanned in %.2f seconds", count, (logEntryScanStop - fileScanStop) / 1000.));
       
       span = Trace.start("removeFiles");
-      Map<String,ArrayList<String>> serverToFileMap = mapServersToFiles(fileToServerMap);
+      Map<String,ArrayList<Path>> serverToFileMap = mapServersToFiles(fileToServerMap);
       
       count = removeFiles(serverToFileMap, sortedWALogs, status);
       
@@ -131,39 +128,36 @@ public class GarbageCollectWriteAheadLog
     }
   }
   
-  private int removeFiles(Map<String,ArrayList<String>> serverToFileMap, Set<String> sortedWALogs, final GCStatus status) {
+  private int removeFiles(Map<String,ArrayList<Path>> serverToFileMap, Set<Path> sortedWALogs, final GCStatus status) {
     AccumuloConfiguration conf = instance.getConfiguration();
-    for (Entry<String,ArrayList<String>> entry : serverToFileMap.entrySet()) {
-      if (entry.getKey().length() == 0) {
+    for (Entry<String,ArrayList<Path>> entry : serverToFileMap.entrySet()) {
+      if (entry.getKey().isEmpty()) {
         // old-style log entry, just remove it
-        for (String filename : entry.getValue()) {
-          log.debug("Removing old-style WAL " + entry.getValue());
+        for (Path path : entry.getValue()) {
+          log.debug("Removing old-style WAL " + path);
           try {
-            Path path = new Path(ServerConstants.getWalDirectory(), filename);
-            if (trash == null || !trash.moveToTrash(path))
-              fs.delete(path, true);
+            if (!useTrash || !fs.moveToTrash(path))
+              fs.deleteRecursively(path);
             status.currentLog.deleted++;
           } catch (FileNotFoundException ex) {
             // ignored
           } catch (IOException ex) {
-            log.error("Unable to delete wal " + filename + ": " + ex);
+            log.error("Unable to delete wal " + path + ": " + ex);
           }
         }
       } else {
         InetSocketAddress address = AddressUtil.parseAddress(entry.getKey());
         if (!holdsLock(address)) {
-          Path serverPath = new Path(ServerConstants.getWalDirectory(), entry.getKey());
-          for (String filename : entry.getValue()) {
-            log.debug("Removing WAL for offline server " + filename);
+          for (Path path : entry.getValue()) {
+            log.debug("Removing WAL for offline server " + path);
             try {
-              Path path = new Path(serverPath, filename);
-              if (trash == null || !trash.moveToTrash(path))
-                fs.delete(path, true);
+              if (!useTrash || !fs.moveToTrash(path))
+                fs.deleteRecursively(path);
               status.currentLog.deleted++;
             } catch (FileNotFoundException ex) {
               // ignored
             } catch (IOException ex) {
-              log.error("Unable to delete wal " + filename + ": " + ex);
+              log.error("Unable to delete wal " + path + ": " + ex);
             }
           }
           continue;
@@ -171,7 +165,7 @@ public class GarbageCollectWriteAheadLog
           Client tserver = null;
           try {
             tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
-            tserver.removeLogs(Tracer.traceInfo(), SecurityConstants.getSystemCredentials(), entry.getValue());
+            tserver.removeLogs(Tracer.traceInfo(), SecurityConstants.getSystemCredentials(), paths2strings(entry.getValue()));
             log.debug("deleted " + entry.getValue() + " from " + entry.getKey());
             status.currentLog.deleted += entry.getValue().size();
           } catch (TException e) {
@@ -184,24 +178,21 @@ public class GarbageCollectWriteAheadLog
       }
     }
     
-    Path recoveryDir = new Path(ServerConstants.getRecoveryDir());
-    
-    for (String sortedWALog : sortedWALogs) {
-      log.debug("Removing sorted WAL " + sortedWALog);
-      Path swalog = new Path(recoveryDir, sortedWALog);
+    for (Path swalog : sortedWALogs) {
+      log.debug("Removing sorted WAL " + swalog);
       try {
-        if (trash == null || !trash.moveToTrash(swalog)) {
-          fs.delete(swalog, true);
+        if (!useTrash || !fs.moveToTrash(swalog)) {
+          fs.deleteRecursively(swalog);
         }
       } catch (FileNotFoundException ex) {
         // ignored
       } catch (IOException ioe) {
         try {
           if (fs.exists(swalog)) {
-            log.error("Unable to delete sorted walog " + sortedWALog + ": " + ioe);
+            log.error("Unable to delete sorted walog " + swalog + ": " + ioe);
           }
         } catch (IOException ex) {
-          log.error("Unable to check for the existence of " + sortedWALog, ex);
+          log.error("Unable to check for the existence of " + swalog, ex);
         }
       }
     }
@@ -209,30 +200,42 @@ public class GarbageCollectWriteAheadLog
     return 0;
   }
   
-  private static Map<String,ArrayList<String>> mapServersToFiles(Map<String,String> fileToServerMap) {
-    Map<String,ArrayList<String>> serverToFileMap = new HashMap<String,ArrayList<String>>();
-    for (Entry<String,String> fileServer : fileToServerMap.entrySet()) {
-      ArrayList<String> files = serverToFileMap.get(fileServer.getValue());
+  private List<String> paths2strings(ArrayList<Path> paths) {
+    List<String> result = new ArrayList<String>(paths.size());
+    for (Path path : paths)
+      result.add(path.toString());
+    return result;
+  }
+
+  private static Map<String,ArrayList<Path>> mapServersToFiles(Map<Path,String> fileToServerMap) {
+    Map<String,ArrayList<Path>> result = new HashMap<String,ArrayList<Path>>();
+    for (Entry<Path,String> fileServer : fileToServerMap.entrySet()) {
+      ArrayList<Path> files = result.get(fileServer.getValue());
       if (files == null) {
-        files = new ArrayList<String>();
-        serverToFileMap.put(fileServer.getValue(), files);
+        files = new ArrayList<Path>();
+        result.put(fileServer.getValue(), files);
       }
       files.add(fileServer.getKey());
     }
-    return serverToFileMap;
+    return result;
   }
   
-  private static int removeMetadataEntries(Map<String,String> fileToServerMap, Set<String> sortedWALogs, GCStatus status) throws IOException, KeeperException,
+  private static int removeMetadataEntries(Map<Path,String> fileToServerMap, Set<Path> sortedWALogs, GCStatus status) throws IOException, KeeperException,
       InterruptedException {
     int count = 0;
     Iterator<LogEntry> iterator = MetadataTable.getLogEntries(SecurityConstants.getSystemCredentials());
     while (iterator.hasNext()) {
       for (String filename : iterator.next().logSet) {
-        filename = filename.split("/", 2)[1];
-        if (fileToServerMap.remove(filename) != null)
+        Path path;
+        if (filename.contains(":"))
+          path = new Path(filename);
+        else
+          path = new Path(ServerConstants.getWalDirs()[0] + filename);
+        
+        if (fileToServerMap.remove(path) != null)
           status.currentLog.inUse++;
         
-        sortedWALogs.remove(filename);
+        sortedWALogs.remove(path);
         
         count++;
       }
@@ -240,46 +243,52 @@ public class GarbageCollectWriteAheadLog
     return count;
   }
   
-  private int scanServers(Map<String,String> fileToServerMap) throws Exception {
-    Path walRoot = new Path(ServerConstants.getWalDirectory());
-    for (FileStatus status : fs.listStatus(walRoot)) {
-      String name = status.getPath().getName();
-      if (status.isDir()) {
-        for (FileStatus file : fs.listStatus(new Path(walRoot, name))) {
-          if (isUUID(file.getPath().getName()))
-            fileToServerMap.put(file.getPath().getName(), name);
-          else {
-            log.info("Ignoring file " + file.getPath() + " because it doesn't look like a uuid");
+  private int scanServers(Map<Path,String> fileToServerMap) throws Exception {
+    Set<String> servers = new HashSet<String>();
+    for (String walDir : ServerConstants.getWalDirs()) {
+      Path walRoot = new Path(walDir);
+      FileStatus[] listing = fs.listStatus(walRoot);
+      if (listing == null)
+        continue;
+      for (FileStatus status : listing) {
+        String server = status.getPath().getName();
+        servers.add(server);
+        if (status.isDir()) {
+          for (FileStatus file : fs.listStatus(new Path(walRoot, server))) {
+            if (isUUID(file.getPath().getName()))
+              fileToServerMap.put(file.getPath(), server);
+            else {
+              log.info("Ignoring file " + file.getPath() + " because it doesn't look like a uuid");
+            }
           }
+        } else if (isUUID(server)) {
+          // old-style WAL are not under a directory
+          fileToServerMap.put(status.getPath(), "");
+        } else {
+          log.info("Ignoring file " + status.getPath() + " because it doesn't look like a uuid");
         }
-      } else if (isUUID(name)) {
-        // old-style WAL are not under a directory
-        fileToServerMap.put(name, "");
-      } else {
-        log.info("Ignoring file " + name + " because it doesn't look like a uuid");
       }
     }
-    
-    int count = 0;
-    return count;
+    return servers.size();
   }
   
-  private Set<String> getSortedWALogs() throws IOException {
-    Path recoveryDir = new Path(ServerConstants.getRecoveryDir());
+  private Set<Path> getSortedWALogs() throws IOException {
+    Set<Path> result = new HashSet<Path>();
     
-    Set<String> sortedWALogs = new HashSet<String>();
-    
-    if (fs.exists(recoveryDir)) {
-      for (FileStatus status : fs.listStatus(recoveryDir)) {
-        if (isUUID(status.getPath().getName())) {
-          sortedWALogs.add(status.getPath().getName());
-        } else {
-          log.debug("Ignoring file " + status.getPath() + " because it doesn't look like a uuid");
+    for (String dir : ServerConstants.getRecoveryDirs()) {
+      Path recoveryDir = new Path(dir);
+      
+      if (fs.exists(recoveryDir)) {
+        for (FileStatus status : fs.listStatus(recoveryDir)) {
+          if (isUUID(status.getPath().getName())) {
+            result.add(status.getPath());
+          } else {
+            log.debug("Ignoring file " + status.getPath() + " because it doesn't look like a uuid");
+          }
         }
       }
     }
-    
-    return sortedWALogs;
+    return result;
   }
   
   static private boolean isUUID(String name) {

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java Wed Jun 19 20:18:30 2013
@@ -56,7 +56,6 @@ import org.apache.accumulo.core.data.Mut
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileOperations;
-import org.apache.accumulo.core.file.FileUtil;
 import org.apache.accumulo.core.gc.thrift.GCMonitorService.Iface;
 import org.apache.accumulo.core.gc.thrift.GCMonitorService.Processor;
 import org.apache.accumulo.core.gc.thrift.GCStatus;
@@ -66,7 +65,6 @@ import org.apache.accumulo.core.security
 import org.apache.accumulo.core.security.CredentialHelper;
 import org.apache.accumulo.core.security.SecurityUtil;
 import org.apache.accumulo.core.security.thrift.TCredentials;
-import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.MetadataTable;
 import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.accumulo.core.util.RootTable;
@@ -80,11 +78,11 @@ import org.apache.accumulo.server.Accumu
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.master.state.tables.TableManager;
 import org.apache.accumulo.server.security.SecurityConstants;
-import org.apache.accumulo.server.trace.TraceFileSystem;
 import org.apache.accumulo.server.util.Halt;
-import org.apache.accumulo.server.util.OfflineMetadataScanner;
 import org.apache.accumulo.server.util.TServerUtils;
 import org.apache.accumulo.server.util.TabletIterator;
 import org.apache.accumulo.server.zookeeper.ZooLock;
@@ -95,9 +93,7 @@ import org.apache.accumulo.trace.instrum
 import org.apache.accumulo.trace.instrument.thrift.TraceWrap;
 import org.apache.accumulo.trace.thrift.TInfo;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Trash;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;
@@ -128,8 +124,8 @@ public class SimpleGarbageCollector impl
   private TCredentials credentials;
   private long gcStartDelay;
   private boolean checkForBulkProcessingFiles;
-  private FileSystem fs;
-  private Trash trash = null;
+  private VolumeManager fs;
+  private boolean useTrash = true;
   private boolean safemode = false, offline = false, verbose = false;
   private String address = "localhost";
   private ZooLock lock;
@@ -146,7 +142,7 @@ public class SimpleGarbageCollector impl
     
     Instance instance = HdfsZooInstance.getInstance();
     ServerConfiguration serverConf = new ServerConfiguration(instance);
-    final FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), serverConf.getConfiguration());
+    final VolumeManager fs = VolumeManagerImpl.get();
     Accumulo.init(fs, serverConf, "gc");
     String address = "localhost";
     SimpleGarbageCollector gc = new SimpleGarbageCollector();
@@ -185,8 +181,8 @@ public class SimpleGarbageCollector impl
     this.address = address;
   }
   
-  public void init(FileSystem fs, Instance instance, TCredentials credentials, boolean noTrash) throws IOException {
-    this.fs = TraceFileSystem.wrap(fs);
+  public void init(VolumeManager fs, Instance instance, TCredentials credentials, boolean noTrash) throws IOException {
+    this.fs = fs;
     this.credentials = credentials;
     this.instance = instance;
     
@@ -200,9 +196,7 @@ public class SimpleGarbageCollector impl
     log.info("verbose: " + verbose);
     log.info("memory threshold: " + CANDIDATE_MEMORY_PERCENTAGE + " of " + Runtime.getRuntime().maxMemory() + " bytes");
     log.info("delete threads: " + numDeleteThreads);
-    if (!noTrash) {
-      this.trash = new Trash(fs, fs.getConf());
-    }
+    useTrash = !noTrash;
   }
   
   private void run() {
@@ -302,7 +296,7 @@ public class SimpleGarbageCollector impl
       // Clean up any unused write-ahead logs
       Span waLogs = Trace.start("walogs");
       try {
-        GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(instance, fs, trash == null);
+        GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(instance, fs, useTrash);
         log.info("Beginning garbage collection of write-ahead logs");
         walogCollector.collect(status);
       } catch (Exception e) {
@@ -332,10 +326,10 @@ public class SimpleGarbageCollector impl
   }
   
   private boolean moveToTrash(Path path) throws IOException {
-    if (trash == null)
+    if (!useTrash)
       return false;
     try {
-      return trash.moveToTrash(path);
+      return fs.moveToTrash(path);
     } catch (FileNotFoundException ex) {
       return false;
     }
@@ -367,21 +361,22 @@ public class SimpleGarbageCollector impl
       // if dir exist and is empty, then empty list is returned...
       // hadoop 1.0 will return null if the file doesn't exist
       // hadoop 2.0 will throw an exception if the file does not exist
-      FileStatus[] tabletDirs = null;
-      try {
-        tabletDirs = fs.listStatus(new Path(ServerConstants.getTablesDir() + "/" + delTableId));
-      } catch (FileNotFoundException ex) {
-        // ignored
-      }
-      
-      if (tabletDirs == null)
-        continue;
-      
-      if (tabletDirs.length == 0) {
-        Path p = new Path(ServerConstants.getTablesDir() + "/" + delTableId);
-        if (!moveToTrash(p))
-          fs.delete(p, false);
-      }
+      for (String dir : ServerConstants.getTablesDirs()) {
+        FileStatus[] tabletDirs = null;
+        try {
+          tabletDirs = fs.listStatus(new Path(dir + "/" + delTableId));
+        } catch (FileNotFoundException ex) {
+          // ignored 
+        }
+        if (tabletDirs == null)
+          continue;
+        
+        if (tabletDirs.length == 0) {
+          Path p = new Path(dir + "/" + delTableId);
+          if (!moveToTrash(p)) 
+            fs.delete(p);
+        }
+      } 
     }
   }
   
@@ -440,10 +435,12 @@ public class SimpleGarbageCollector impl
       checkForBulkProcessingFiles = true;
       try {
         for (String validExtension : FileOperations.getValidExtensions()) {
-          for (FileStatus stat : fs.globStatus(new Path(ServerConstants.getTablesDir() + "/*/*/*." + validExtension))) {
-            String cand = stat.getPath().toUri().getPath();
-            if (!cand.contains(ServerConstants.getRootTabletDir())) {
-              candidates.add(cand.substring(ServerConstants.getTablesDir().length()));
+          for (String dir : ServerConstants.getTablesDirs()) {
+            for (FileStatus stat : fs.globStatus(new Path(dir + "/*/*/*." + validExtension))) {
+              String cand = stat.getPath().toUri().getPath();
+              if (cand.contains(ServerConstants.getRootTabletDir()))
+                continue;
+              candidates.add(cand.substring(dir.length()));
               log.debug("Offline candidate: " + cand);
             }
           }
@@ -511,11 +508,13 @@ public class SimpleGarbageCollector impl
     
     Scanner scanner;
     if (offline) {
-      try {
-        scanner = new OfflineMetadataScanner(instance.getConfiguration(), fs);
-      } catch (IOException e) {
-        throw new IllegalStateException("Unable to create offline metadata scanner", e);
-      }
+      // TODO
+      throw new RuntimeException("Offline scanner no longer supported");
+//      try {
+//        scanner = new OfflineMetadataScanner(instance.getConfiguration(), fs);
+//      } catch (IOException e) {
+//        throw new IllegalStateException("Unable to create offline metadata scanner", e);
+//      }
     } else {
       try {
         scanner = new IsolatedScanner(instance.getConnector(credentials.getPrincipal(), CredentialHelper.extractToken(credentials)).createScanner(
@@ -563,7 +562,6 @@ public class SimpleGarbageCollector impl
     scanner.fetchColumnFamily(MetadataTable.DATAFILE_COLUMN_FAMILY);
     scanner.fetchColumnFamily(MetadataTable.SCANFILE_COLUMN_FAMILY);
     MetadataTable.DIRECTORY_COLUMN.fetch(scanner);
-    
     TabletIterator tabletIterator = new TabletIterator(scanner, MetadataTable.KEYSPACE, false, true);
     
     while (tabletIterator.hasNext()) {
@@ -574,12 +572,17 @@ public class SimpleGarbageCollector impl
             || entry.getKey().getColumnFamily().equals(MetadataTable.SCANFILE_COLUMN_FAMILY)) {
           
           String cf = entry.getKey().getColumnQualifier().toString();
-          String delete;
-          if (cf.startsWith("../")) {
-            delete = cf.substring(2);
-          } else {
-            String table = new String(KeyExtent.tableOfMetadataRow(entry.getKey().getRow()));
-            delete = "/" + table + cf;
+          String delete = cf;
+          if (!cf.contains(":")) {
+            if (cf.startsWith("../")) {
+              delete = cf.substring(2);
+            } else {
+              String table = new String(KeyExtent.tableOfMetadataRow(entry.getKey().getRow()));
+              if (cf.startsWith("/"))
+                delete = "/" + table + cf;
+              else
+                delete = "/" + table + "/" + cf;
+            }
           }
           // WARNING: This line is EXTREMELY IMPORTANT.
           // You MUST REMOVE candidates that are still in use
@@ -603,7 +606,7 @@ public class SimpleGarbageCollector impl
   final static String METADATA_TABLE_DIR = "/" + MetadataTable.ID;
   
   private static void putMarkerDeleteMutation(final String delete, final BatchWriter writer, final BatchWriter rootWriter) throws MutationsRejectedException {
-    if (delete.startsWith(METADATA_TABLE_DIR)) {
+    if (delete.contains(METADATA_TABLE_DIR)) {
       Mutation m = new Mutation(new Text(RootTable.DELETE_FLAG_PREFIX + delete));
       m.putDelete(EMPTY_TEXT, EMPTY_TEXT);
       rootWriter.addMutation(m);
@@ -669,26 +672,29 @@ public class SimpleGarbageCollector impl
         public void run() {
           boolean removeFlag;
           
-          String fullPath = ServerConstants.getTablesDir() + delete;
-          log.debug("Deleting " + fullPath);
           try {
+            Path fullPath;
+
+            if (delete.contains(":"))
+              fullPath = new Path(delete);
+            else
+              fullPath = fs.getFullPath(ServerConstants.getTablesDirs(), delete);
+            log.debug("Deleting " + fullPath);
             
-            Path p = new Path(fullPath);
-            
-            if (moveToTrash(p) || fs.delete(p, true)) {
+            if (moveToTrash(fullPath) || fs.deleteRecursively(fullPath)) {
               // delete succeeded, still want to delete
               removeFlag = true;
               synchronized (SimpleGarbageCollector.this) {
                 ++status.current.deleted;
               }
-            } else if (fs.exists(p)) {
+            } else if (fs.exists(fullPath)) {
               // leave the entry in the METADATA table; we'll try again
               // later
               removeFlag = false;
               synchronized (SimpleGarbageCollector.this) {
                 ++status.current.errors;
               }
-              log.warn("File exists, but was not deleted for an unknown reason: " + p);
+              log.warn("File exists, but was not deleted for an unknown reason: " + fullPath);
             } else {
               // this failure, we still want to remove the METADATA table
               // entry
@@ -698,14 +704,14 @@ public class SimpleGarbageCollector impl
               }
               String parts[] = delete.split("/");
               if (parts.length > 2) {
-                String tableId = parts[1];
-                String tabletDir = parts[2];
+                String tableId = parts[parts.length - 3];
+                String tabletDir = parts[parts.length - 2];
                 TableManager.getInstance().updateTableStateCache(tableId);
                 TableState tableState = TableManager.getInstance().getTableState(tableId);
                 if (tableState != null && tableState != TableState.DELETING) {
                   // clone directories don't always exist
                   if (!tabletDir.startsWith("c-"))
-                    log.warn("File doesn't exist: " + p);
+                    log.warn("File doesn't exist: " + fullPath);
                 }
               } else {
                 log.warn("Very strange path name: " + delete);

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java Wed Jun 19 20:18:30 2013
@@ -30,15 +30,11 @@ import java.util.regex.Pattern;
 import org.apache.accumulo.core.cli.Help;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.file.FileUtil;
-import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.tabletserver.log.DfsLogger;
 import org.apache.accumulo.server.tabletserver.log.MultiReader;
-import org.apache.accumulo.server.trace.TraceFileSystem;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 
@@ -70,9 +66,7 @@ public class LogReader {
   public static void main(String[] args) throws IOException {
     Opts opts = new Opts();
     opts.parseArgs(LogReader.class.getName(), args);
-    Configuration conf = CachedConfiguration.getInstance();
-    FileSystem fs = TraceFileSystem.wrap(FileUtil.getFileSystem(conf, ServerConfiguration.getSiteConfiguration()));
-    FileSystem local = TraceFileSystem.wrap(FileSystem.getLocal(conf));
+    VolumeManager fs = VolumeManagerImpl.get();
     
     Matcher rowMatcher = null;
     KeyExtent ke = null;
@@ -117,25 +111,9 @@ public class LogReader {
         } finally {
           f.close();
         }
-      } else if (local.isFile(path)) {
-        // read log entries from a simple file
-        FSDataInputStream f = DfsLogger.readHeader(fs, path, meta);
-        try {
-          while (true) {
-            try {
-              key.readFields(f);
-              value.readFields(f);
-            } catch (EOFException ex) {
-              break;
-            }
-            printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
-          }
-        } finally {
-          f.close();
-        }
       } else {
         // read the log entries sorted in a map file
-        MultiReader input = new MultiReader(fs, conf, file);
+        MultiReader input = new MultiReader(fs, path);
         while (input.next(key, value)) {
           printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
         }



Mime
View raw message