accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject svn commit: r1493756 [1/2] - in /accumulo/branches/ACCUMULO-118: core/src/main/java/org/apache/accumulo/core/ core/src/main/java/org/apache/accumulo/core/client/admin/ core/src/main/java/org/apache/accumulo/core/client/impl/ core/src/main/java/org/apac...
Date Mon, 17 Jun 2013 13:26:43 GMT
Author: ecn
Date: Mon Jun 17 13:26:43 2013
New Revision: 1493756

URL: http://svn.apache.org/r1493756
Log:
ACCUMULO-118 fixed log recovery, du

Removed:
    accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/util/TableDiskUsage.java
Modified:
    accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/Constants.java
    accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java
    accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
    accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
    accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
    accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/conf/Property.java
    accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DUCommand.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/ServerConstants.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileRef.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystem.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystemImpl.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/Initialize.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/TableDiskUsage.java
    accumulo/branches/ACCUMULO-118/server/src/test/java/org/apache/accumulo/server/tabletserver/log/MultiReaderTest.java
    accumulo/branches/ACCUMULO-118/server/src/test/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecoveryTest.java
    accumulo/branches/ACCUMULO-118/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryTest.java

Modified: accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/Constants.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/Constants.java?rev=1493756&r1=1493755&r2=1493756&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/Constants.java (original)
+++ accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/Constants.java Mon Jun 17 13:26:43 2013
@@ -178,34 +178,9 @@ public class Constants {
     return conf.get(Property.INSTANCE_DFS_DIR);
   }
   
-  public static String getTablesDir(final AccumuloConfiguration conf) {
-    return getBaseDir(conf) + "/tables";
-  }
-  
-  public static String getRecoveryDir(final AccumuloConfiguration conf) {
-    return getBaseDir(conf) + "/recovery";
-  }
-  
   public static Path getDataVersionLocation(final AccumuloConfiguration conf) {
     return new Path(getBaseDir(conf) + "/version");
   }
   
-  public static String getMetadataTableDir(final AccumuloConfiguration conf) {
-    return getTablesDir(conf) + "/" + METADATA_TABLE_ID;
-  }
-  
-  public static String getRootTabletDir(final AccumuloConfiguration conf) {
-    return getMetadataTableDir(conf) + ZROOT_TABLET;
-  }
-
-  
-  /**
-   * @param conf
-   * @return The write-ahead log directory.
-   */
-  public static String getWalDirectory(final AccumuloConfiguration conf) {
-    return getBaseDir(conf) + "/wal";
-  }
-
-    public static final String AUDITLOG = "Audit";
+  public static final String AUDITLOG = "Audit";
 }

Modified: accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java?rev=1493756&r1=1493755&r2=1493756&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java (original)
+++ accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperationsImpl.java Mon Jun 17 13:26:43 2013
@@ -1274,7 +1274,6 @@ public class TableOperationsImpl extends
     
     try {
       FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), instance.getConfiguration());
-      ;
       Map<String,String> props = getExportedProps(fs, new Path(importDir, Constants.EXPORT_FILE));
       
       for (String propKey : props.keySet()) {

Modified: accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java?rev=1493756&r1=1493755&r2=1493756&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java (original)
+++ accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineScanner.java Mon Jun 17 13:26:43 2013
@@ -229,14 +229,20 @@ class OfflineIterator implements Iterato
     
     if (currentExtent != null && !extent.isPreviousExtent(currentExtent))
       throw new AccumuloException(" " + currentExtent + " is not previous extent " + extent);
-    
-    String tablesDir = Constants.getTablesDir(instance.getConfiguration());
+
+    // TODO: ACCUMULO-118 needs fullpaths
     List<String> absFiles = new ArrayList<String>();
     for (String relPath : relFiles) {
-      if (relPath.startsWith(".."))
-        absFiles.add(tablesDir + relPath.substring(2));
-      else
-        absFiles.add(tablesDir + "/" + tableId + relPath);
+      if (relFiles.contains(":")) {
+        absFiles.add(relPath);
+      } else {
+        throw new RuntimeException("Unimplemented: offline scanner over relative paths");
+//        if (relPath.startsWith("..")) {
+//          absFiles.add(fs.getFullPath(tablesDir + relPath.substring(2));
+//        } else {
+//          absFiles.add(tablesDir + "/" + tableId + relPath);
+//        }
+      }
     }
     
     iter = createIterator(extent, absFiles);

Modified: accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java?rev=1493756&r1=1493755&r2=1493756&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java (original)
+++ accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java Mon Jun 17 13:26:43 2013
@@ -437,6 +437,8 @@ public class ThriftScanner {
       List<KeyValue> results = new ArrayList<KeyValue>(sr.results.size());
       for (TKeyValue tkv : sr.results)
         results.add(new KeyValue(new Key(tkv.key), tkv.value));
+      for (KeyValue r : results)
+        log.trace("Got result " + r);
       
       return results;
       

Modified: accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java?rev=1493756&r1=1493755&r2=1493756&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java (original)
+++ accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java Mon Jun 17 13:26:43 2013
@@ -306,11 +306,7 @@ public class MockTableOperations extends
   public List<DiskUsage> getDiskUsage(Set<String> tables) throws AccumuloException, AccumuloSecurityException {
 
     List<DiskUsage> diskUsages = new ArrayList<DiskUsage>();
-    for(String table : tables) {
-      TreeSet<String> tree = new TreeSet<String>();
-      tree.add(table);
-      diskUsages.add(new DiskUsage(tree, 1l));
-    }
+    diskUsages.add(new DiskUsage(new TreeSet<String>(tables), 0l));
 
     return diskUsages;
   }

Modified: accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/conf/Property.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/conf/Property.java?rev=1493756&r1=1493755&r2=1493756&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/conf/Property.java (original)
+++ accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/conf/Property.java Mon Jun 17 13:26:43 2013
@@ -70,8 +70,7 @@ public enum Property {
       "A secret unique to a given instance that all servers must know in order to communicate with one another."
           + " Change it before initialization. To change it later use ./bin/accumulo accumulo.server.util.ChangeSecret [oldpasswd] [newpasswd], "
           + " and then update conf/accumulo-site.xml everywhere."),
-  INSTANCE_NAMESPACES("instance.namespaces", "", PropertyType.STRING,
-      "A list of namespaces to use."),
+  INSTANCE_NAMESPACES("instance.namespaces", "", PropertyType.STRING, "A list of namespaces to use."),
   INSTANCE_SECURITY_AUTHENTICATOR("instance.security.authenticator", "org.apache.accumulo.server.security.handler.ZKAuthenticator", PropertyType.CLASSNAME,
       "The authenticator class that accumulo will use to determine if a user has privilege to perform an action"),
   INSTANCE_SECURITY_AUTHORIZOR("instance.security.authorizor", "org.apache.accumulo.server.security.handler.ZKAuthorizor", PropertyType.CLASSNAME,

Modified: accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DUCommand.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DUCommand.java?rev=1493756&r1=1493755&r2=1493756&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DUCommand.java (original)
+++ accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DUCommand.java Mon Jun 17 13:26:43 2013
@@ -22,17 +22,13 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 
 import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.ConfigurationCopy;
-import org.apache.accumulo.core.util.TableDiskUsage;
-import org.apache.accumulo.core.util.TableDiskUsage.Printer;
+import org.apache.accumulo.core.client.admin.DiskUsage;
+import org.apache.accumulo.core.util.NumUtil;
 import org.apache.accumulo.core.util.shell.Shell;
 import org.apache.accumulo.core.util.shell.Shell.Command;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 
 public class DUCommand extends Command {
   
@@ -51,22 +47,17 @@ public class DUCommand extends Command {
         }
       }
     } else {
-      shellState.checkTableState();
-      tablesToFlush.add(shellState.getTableName());
+      if (tablesToFlush.isEmpty()) {
+        shellState.checkTableState();
+        tablesToFlush.add(shellState.getTableName());
+      }
     }
     try {
-      final AccumuloConfiguration acuConf = new ConfigurationCopy(shellState.getConnector().instanceOperations().getSystemConfiguration());
-      TableDiskUsage.printDiskUsage(acuConf, tablesToFlush, FileSystem.get(new Configuration()), shellState.getConnector(), new Printer() {
-        @Override
-        public void print(String line) {
-          try {
-            shellState.getReader().println(line);
-          } catch (IOException ex) {
-            throw new RuntimeException(ex);
-          }
-        }
-        
-      }, prettyPrint);
+      String valueFormat = prettyPrint ? "%9s" : "%,24d";
+      for (DiskUsage usage : shellState.getConnector().tableOperations().getDiskUsage(tablesToFlush)) {
+        Object value = prettyPrint ? NumUtil.bigNumberForSize(usage.getUsage()) : usage.getUsage();
+        shellState.getReader().println(String.format(valueFormat + " %s", value, usage.getTables()));
+      }
     } catch (Exception ex) {
       throw new RuntimeException(ex);
     }

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/ServerConstants.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/ServerConstants.java?rev=1493756&r1=1493755&r2=1493756&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/ServerConstants.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/ServerConstants.java Mon Jun 17 13:26:43 2013
@@ -16,23 +16,31 @@
  */
 package org.apache.accumulo.server;
 
+import static org.apache.accumulo.core.Constants.METADATA_TABLE_ID;
+import static org.apache.accumulo.core.Constants.ZROOT_TABLET;
+
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
-import static org.apache.accumulo.core.Constants.*;
-
 public class ServerConstants {
+
   // these are functions to delay loading the Accumulo configuration unless we must
   public static String[] getBaseDirs() {
     String singleNamespace = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_DIR);
     String ns = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_NAMESPACES);
-    if (ns == null) {
-      return new String[] { singleNamespace };
+    if (ns == null || ns.isEmpty()) {
+      Configuration hadoopConfig = CachedConfiguration.getInstance();
+      String fullPath = hadoopConfig.get("fs.default.name") + singleNamespace;
+      return new String[] { fullPath };
     }
     String namespaces[] = ns.split(",");
     if (namespaces.length < 2) {
-      return new String[] { singleNamespace };
+      Configuration hadoopConfig = CachedConfiguration.getInstance();
+      String fullPath = hadoopConfig.get("fs.default.name") + singleNamespace;
+      return new String[] { fullPath };
     }
     return prefix(namespaces, singleNamespace);
   }
@@ -53,6 +61,14 @@ public class ServerConstants {
     return prefix(getBaseDirs(), "recovery");
   }
   
+  public static String[] getWalDirs() {
+    return prefix(getBaseDirs(), "wal");
+  }
+  
+  public static String[] getWalogArchives() {
+    return prefix(getBaseDirs(), "walogArchive");
+  }
+  
   public static Path getInstanceIdLocation() {
     return new Path(ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_DIR) + "/instance_id");
   }
@@ -68,5 +84,5 @@ public class ServerConstants {
   public static String getRootTabletDir() {
     return prefix(getMetadataTableDirs(), ZROOT_TABLET)[0];
   }
-  
+
 }

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java?rev=1493756&r1=1493755&r2=1493756&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java Mon Jun 17 13:26:43 2013
@@ -52,10 +52,10 @@ import org.apache.accumulo.core.security
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.core.util.TableDiskUsage;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.security.AuditedSecurityOperation;
 import org.apache.accumulo.server.security.SecurityOperation;
+import org.apache.accumulo.server.util.TableDiskUsage;
 import org.apache.accumulo.server.zookeeper.TransactionWatcher;
 import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
 import org.apache.accumulo.trace.thrift.TInfo;
@@ -339,7 +339,7 @@ public class ClientServiceHandler implem
       FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), conf);
       
       // use the same set of tableIds that were validated above to avoid race conditions
-      Map<TreeSet<String>,Long> diskUsage = TableDiskUsage.getDiskUsage(new ServerConfiguration(instance).getConfiguration(), tableIds, fs, conn, false);
+      Map<TreeSet<String>,Long> diskUsage = TableDiskUsage.getDiskUsage(new ServerConfiguration(instance).getConfiguration(), tableIds, fs, conn);
       List<TDiskUsage> retUsages = new ArrayList<TDiskUsage>();
       for (Map.Entry<TreeSet<String>,Long> usageItem : diskUsage.entrySet()) {
         retUsages.add(new TDiskUsage(new ArrayList<String>(usageItem.getKey()), usageItem.getValue()));

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileRef.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileRef.java?rev=1493756&r1=1493755&r2=1493756&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileRef.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileRef.java Mon Jun 17 13:26:43 2013
@@ -32,7 +32,7 @@ public class FileRef implements Comparab
   
   public FileRef(FileSystem fs, Key key) {
     metaReference = key.getColumnQualifier().toString();
-    fullReference = new Path(fs.getFullPath(key));
+    fullReference = fs.getFullPath(key);
   }
   
   public FileRef(String metaReference, Path fullReference) {

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystem.java?rev=1493756&r1=1493755&r2=1493756&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystem.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystem.java Mon Jun 17 13:26:43 2013
@@ -109,10 +109,10 @@ public interface FileSystem {
   FileStatus[] globStatus(Path path) throws IOException;
 
   // Convert a file or directory !METADATA reference into a path
-  String getFullPath(Key key);
+  Path getFullPath(Key key);
   
   // Given a filename, figure out the qualified path given multiple namespaces
-  String getFullPath(String paths[], String fileName) throws IOException;
+  Path getFullPath(String paths[], String fileName) throws IOException;
 
   // forward to the appropriate FileSystem object
   ContentSummary getContentSummary(String dir);

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystemImpl.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystemImpl.java?rev=1493756&r1=1493755&r2=1493756&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystemImpl.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystemImpl.java Mon Jun 17 13:26:43 2013
@@ -23,15 +23,17 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.commons.lang.NotImplementedException;
@@ -307,9 +309,24 @@ public class FileSystemImpl implements o
     AccumuloConfiguration conf = ServerConfiguration.getSystemConfiguration(HdfsZooInstance.getInstance());
     return get(conf);
   }
+  
+  static private final String DEFAULT = "";
 
   public static org.apache.accumulo.server.fs.FileSystem get(AccumuloConfiguration conf) throws IOException {
-    return new FileSystemImpl(Collections.singletonMap("", FileSystem.get(CachedConfiguration.getInstance())), "", conf);
+    Map<String, FileSystem> fileSystems = new HashMap<String, FileSystem>();
+    Configuration hadoopConf = CachedConfiguration.getInstance();
+    fileSystems.put(DEFAULT, FileSystem.get(hadoopConf));
+    String ns = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_NAMESPACES);
+    if (ns != null) {
+      for (String space : ns.split(",")) {
+        if (space.contains(":")) {
+          fileSystems.put(space, new Path(space).getFileSystem(hadoopConf));
+        } else {
+          fileSystems.put(space, FileSystem.get(hadoopConf));
+        }
+      }
+    }
+    return new FileSystemImpl(fileSystems, "", conf);
   }
 
   @Override
@@ -364,11 +381,11 @@ public class FileSystemImpl implements o
   }
   
   @Override
-  public String getFullPath(Key key) {
+  public Path getFullPath(Key key) {
     
     String relPath = key.getColumnQualifierData().toString();
     if (relPath.contains(":"))
-      return relPath;
+      return new Path(relPath);
    
     byte [] tableId = KeyExtent.tableOfMetadataRow(key.getRow());
     
@@ -376,10 +393,9 @@ public class FileSystemImpl implements o
       relPath = relPath.substring(2);
     else
       relPath = "/" + new String(tableId) + relPath;
-    String fullPath = Constants.getTablesDir(conf) + relPath;
+    String fullPath = ServerConstants.getTablesDirs()[0] + relPath;
     FileSystem ns = getFileSystemByPath(fullPath);
-    String result = ns.makeQualified(new Path(fullPath)).toString();
-    return result;
+    return ns.makeQualified(new Path(fullPath));
   }
 
   @Override
@@ -404,20 +420,26 @@ public class FileSystemImpl implements o
   }
 
   @Override
-  public String getFullPath(String[] paths, String fileName) throws IOException {
+  public Path getFullPath(String[] paths, String fileName) throws IOException {
     if (fileName.contains(":"))
-      return fileName;
+      return new Path(fileName);
+    // TODO: ACCUMULO-118
+    // How do we want it to work?  Find it somewhere? or find it in the default file system?
     // old-style name, on one of many possible "root" paths:
     if (fileName.startsWith("../"))
       fileName = fileName.substring(2);
     for (String path : paths) {
-      String fullPath = path + fileName;
+      String fullPath;
+      if (path.endsWith("/") || fileName.startsWith("/"))
+        fullPath = path + fileName;
+      else
+        fullPath = path + "/" + fileName;
       FileSystem ns = getFileSystemByPath(fullPath);
       Path exists = new Path(fullPath);
       if (ns.exists(exists))
-        return ns.makeQualified(exists).toString();
+        return ns.makeQualified(exists);
     }
-    throw new RuntimeException("Could not find file " + fileName + " in " + Arrays.asList(paths));
+    throw new IOException("Could not find file " + fileName + " in " + Arrays.asList(paths));
   }
 
   @Override

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java?rev=1493756&r1=1493755&r2=1493756&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java Mon Jun 17 13:26:43 2013
@@ -38,6 +38,7 @@ import org.apache.accumulo.core.tabletse
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
 import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.fs.FileSystem;
 import org.apache.accumulo.server.security.SecurityConstants;
 import org.apache.accumulo.server.util.AddressUtil;
@@ -71,11 +72,11 @@ public class GarbageCollectWriteAheadLog
     Span span = Trace.start("scanServers");
     try {
       
-      Set<String> sortedWALogs = getSortedWALogs();
+      Set<Path> sortedWALogs = getSortedWALogs();
       
       status.currentLog.started = System.currentTimeMillis();
       
-      Map<String,String> fileToServerMap = new HashMap<String,String>();
+      Map<Path,String> fileToServerMap = new HashMap<Path,String>();
       int count = scanServers(fileToServerMap);
       long fileScanStop = System.currentTimeMillis();
       log.info(String.format("Fetched %d files from %d servers in %.2f seconds", fileToServerMap.size(), count,
@@ -97,7 +98,7 @@ public class GarbageCollectWriteAheadLog
       log.info(String.format("%d log entries scanned in %.2f seconds", count, (logEntryScanStop - fileScanStop) / 1000.));
       
       span = Trace.start("removeFiles");
-      Map<String,ArrayList<String>> serverToFileMap = mapServersToFiles(fileToServerMap);
+      Map<String,ArrayList<Path>> serverToFileMap = mapServersToFiles(fileToServerMap);
       
       count = removeFiles(serverToFileMap, sortedWALogs, status);
       
@@ -127,39 +128,36 @@ public class GarbageCollectWriteAheadLog
     }
   }
   
-  private int removeFiles(Map<String,ArrayList<String>> serverToFileMap, Set<String> sortedWALogs, final GCStatus status) {
+  private int removeFiles(Map<String,ArrayList<Path>> serverToFileMap, Set<Path> sortedWALogs, final GCStatus status) {
     AccumuloConfiguration conf = instance.getConfiguration();
-    for (Entry<String,ArrayList<String>> entry : serverToFileMap.entrySet()) {
-      if (entry.getKey().length() == 0) {
+    for (Entry<String,ArrayList<Path>> entry : serverToFileMap.entrySet()) {
+      if (entry.getKey().isEmpty()) {
         // old-style log entry, just remove it
-        for (String filename : entry.getValue()) {
-          log.debug("Removing old-style WAL " + entry.getValue());
+        for (Path path : entry.getValue()) {
+          log.debug("Removing old-style WAL " + path);
           try {
-            Path path = new Path(Constants.getWalDirectory(conf), filename);
             if (!useTrash || !fs.moveToTrash(path))
               fs.deleteRecursively(path);
             status.currentLog.deleted++;
           } catch (FileNotFoundException ex) {
             // ignored
           } catch (IOException ex) {
-            log.error("Unable to delete wal " + filename + ": " + ex);
+            log.error("Unable to delete wal " + path + ": " + ex);
           }
         }
       } else {
         InetSocketAddress address = AddressUtil.parseAddress(entry.getKey());
         if (!holdsLock(address)) {
-          Path serverPath = new Path(Constants.getWalDirectory(conf), entry.getKey());
-          for (String filename : entry.getValue()) {
-            log.debug("Removing WAL for offline server " + filename);
+          for (Path path : entry.getValue()) {
+            log.debug("Removing WAL for offline server " + path);
             try {
-              Path path = new Path(serverPath, filename);
               if (!useTrash || !fs.moveToTrash(path))
                 fs.deleteRecursively(path);
               status.currentLog.deleted++;
             } catch (FileNotFoundException ex) {
               // ignored
             } catch (IOException ex) {
-              log.error("Unable to delete wal " + filename + ": " + ex);
+              log.error("Unable to delete wal " + path + ": " + ex);
             }
           }
           continue;
@@ -167,7 +165,7 @@ public class GarbageCollectWriteAheadLog
           Client tserver = null;
           try {
             tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
-            tserver.removeLogs(Tracer.traceInfo(), SecurityConstants.getSystemCredentials(), entry.getValue());
+            tserver.removeLogs(Tracer.traceInfo(), SecurityConstants.getSystemCredentials(), paths2strings(entry.getValue()));
             log.debug("deleted " + entry.getValue() + " from " + entry.getKey());
             status.currentLog.deleted += entry.getValue().size();
           } catch (TException e) {
@@ -180,11 +178,8 @@ public class GarbageCollectWriteAheadLog
       }
     }
     
-    Path recoveryDir = new Path(Constants.getRecoveryDir(conf));
-    
-    for (String sortedWALog : sortedWALogs) {
-      log.debug("Removing sorted WAL " + sortedWALog);
-      Path swalog = new Path(recoveryDir, sortedWALog);
+    for (Path swalog : sortedWALogs) {
+      log.debug("Removing sorted WAL " + swalog);
       try {
         if (!useTrash || !fs.moveToTrash(swalog)) {
           fs.deleteRecursively(swalog);
@@ -194,10 +189,10 @@ public class GarbageCollectWriteAheadLog
       } catch (IOException ioe) {
         try {
           if (fs.exists(swalog)) {
-            log.error("Unable to delete sorted walog " + sortedWALog + ": " + ioe);
+            log.error("Unable to delete sorted walog " + swalog + ": " + ioe);
           }
         } catch (IOException ex) {
-          log.error("Unable to check for the existence of " + sortedWALog, ex);
+          log.error("Unable to check for the existence of " + swalog, ex);
         }
       }
     }
@@ -205,30 +200,42 @@ public class GarbageCollectWriteAheadLog
     return 0;
   }
   
-  private static Map<String,ArrayList<String>> mapServersToFiles(Map<String,String> fileToServerMap) {
-    Map<String,ArrayList<String>> serverToFileMap = new HashMap<String,ArrayList<String>>();
-    for (Entry<String,String> fileServer : fileToServerMap.entrySet()) {
-      ArrayList<String> files = serverToFileMap.get(fileServer.getValue());
+  private List<String> paths2strings(ArrayList<Path> paths) {
+    List<String> result = new ArrayList<String>(paths.size());
+    for (Path path : paths)
+      result.add(path.toString());
+    return result;
+  }
+
+  private static Map<String,ArrayList<Path>> mapServersToFiles(Map<Path,String> fileToServerMap) {
+    Map<String,ArrayList<Path>> result = new HashMap<String,ArrayList<Path>>();
+    for (Entry<Path,String> fileServer : fileToServerMap.entrySet()) {
+      ArrayList<Path> files = result.get(fileServer.getValue());
       if (files == null) {
-        files = new ArrayList<String>();
-        serverToFileMap.put(fileServer.getValue(), files);
+        files = new ArrayList<Path>();
+        result.put(fileServer.getValue(), files);
       }
       files.add(fileServer.getKey());
     }
-    return serverToFileMap;
+    return result;
   }
   
-  private static int removeMetadataEntries(Map<String,String> fileToServerMap, Set<String> sortedWALogs, GCStatus status) throws IOException, KeeperException,
+  private static int removeMetadataEntries(Map<Path,String> fileToServerMap, Set<Path> sortedWALogs, GCStatus status) throws IOException, KeeperException,
       InterruptedException {
     int count = 0;
     Iterator<LogEntry> iterator = MetadataTable.getLogEntries(SecurityConstants.getSystemCredentials());
     while (iterator.hasNext()) {
       for (String filename : iterator.next().logSet) {
-        filename = filename.split("/", 2)[1];
-        if (fileToServerMap.remove(filename) != null)
+        Path path;
+        if (filename.contains(":"))
+          path = new Path(filename);
+        else
+          path = new Path(ServerConstants.getWalDirs()[0] + filename);
+        
+        if (fileToServerMap.remove(path) != null)
           status.currentLog.inUse++;
         
-        sortedWALogs.remove(filename);
+        sortedWALogs.remove(path);
         
         count++;
       }
@@ -236,48 +243,52 @@ public class GarbageCollectWriteAheadLog
     return count;
   }
   
-  private int scanServers(Map<String,String> fileToServerMap) throws Exception {
-    AccumuloConfiguration conf = instance.getConfiguration();
-    Path walRoot = new Path(Constants.getWalDirectory(conf));
-    for (FileStatus status : fs.listStatus(walRoot)) {
-      String name = status.getPath().getName();
-      if (status.isDir()) {
-        for (FileStatus file : fs.listStatus(new Path(walRoot, name))) {
-          if (isUUID(file.getPath().getName()))
-            fileToServerMap.put(file.getPath().getName(), name);
-          else {
-            log.info("Ignoring file " + file.getPath() + " because it doesn't look like a uuid");
+  private int scanServers(Map<Path,String> fileToServerMap) throws Exception {
+    Set<String> servers = new HashSet<String>();
+    for (String walDir : ServerConstants.getWalDirs()) {
+      Path walRoot = new Path(walDir);
+      FileStatus[] listing = fs.listStatus(walRoot);
+      if (listing == null)
+        continue;
+      for (FileStatus status : listing) {
+        String server = status.getPath().getName();
+        servers.add(server);
+        if (status.isDir()) {
+          for (FileStatus file : fs.listStatus(new Path(walRoot, server))) {
+            if (isUUID(file.getPath().getName()))
+              fileToServerMap.put(file.getPath(), server);
+            else {
+              log.info("Ignoring file " + file.getPath() + " because it doesn't look like a uuid");
+            }
           }
+        } else if (isUUID(server)) {
+          // old-style WAL are not under a directory
+          fileToServerMap.put(status.getPath(), "");
+        } else {
+          log.info("Ignoring file " + status.getPath() + " because it doesn't look like a uuid");
         }
-      } else if (isUUID(name)) {
-        // old-style WAL are not under a directory
-        fileToServerMap.put(name, "");
-      } else {
-        log.info("Ignoring file " + name + " because it doesn't look like a uuid");
       }
     }
-    
-    int count = 0;
-    return count;
+    return servers.size();
   }
   
-  private Set<String> getSortedWALogs() throws IOException {
-    AccumuloConfiguration conf = instance.getConfiguration();
-    Path recoveryDir = new Path(Constants.getRecoveryDir(conf));
-    
-    Set<String> sortedWALogs = new HashSet<String>();
+  private Set<Path> getSortedWALogs() throws IOException {
+    Set<Path> result = new HashSet<Path>();
     
-    if (fs.exists(recoveryDir)) {
-      for (FileStatus status : fs.listStatus(recoveryDir)) {
-        if (isUUID(status.getPath().getName())) {
-          sortedWALogs.add(status.getPath().getName());
-        } else {
-          log.debug("Ignoring file " + status.getPath() + " because it doesn't look like a uuid");
+    for (String dir : ServerConstants.getRecoveryDirs()) {
+      Path recoveryDir = new Path(dir);
+      
+      if (fs.exists(recoveryDir)) {
+        for (FileStatus status : fs.listStatus(recoveryDir)) {
+          if (isUUID(status.getPath().getName())) {
+            result.add(status.getPath());
+          } else {
+            log.debug("Ignoring file " + status.getPath() + " because it doesn't look like a uuid");
+          }
         }
       }
     }
-    
-    return sortedWALogs;
+    return result;
   }
   
   /**

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java?rev=1493756&r1=1493755&r2=1493756&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java Mon Jun 17 13:26:43 2013
@@ -572,7 +572,10 @@ public class SimpleGarbageCollector impl
             delete = cf.substring(2);
           } else {
             String table = new String(KeyExtent.tableOfMetadataRow(entry.getKey().getRow()));
-            delete = "/" + table + cf;
+            if (cf.startsWith("/"))
+              delete = "/" + table + cf;
+            else
+              delete = "/" + table + "/" + cf;
           }
           // WARNING: This line is EXTREMELY IMPORTANT.
           // You MUST REMOVE candidates that are still in use
@@ -662,25 +665,27 @@ public class SimpleGarbageCollector impl
           boolean removeFlag;
           
           try {
-            String fullPath = fs.getFullPath(ServerConstants.getTablesDirs(), delete);
+            Path fullPath;
+            if (delete.contains(":"))
+              fullPath = new Path(delete.split("/", 3)[2]);
+            else
+              fullPath = fs.getFullPath(ServerConstants.getTablesDirs(), delete);
             log.debug("Deleting " + fullPath);
             
-            Path p = new Path(fullPath);
-            
-            if (moveToTrash(p) || fs.deleteRecursively(p)) {
+            if (moveToTrash(fullPath) || fs.deleteRecursively(fullPath)) {
               // delete succeeded, still want to delete
               removeFlag = true;
               synchronized (SimpleGarbageCollector.this) {
                 ++status.current.deleted;
               }
-            } else if (fs.exists(p)) {
+            } else if (fs.exists(fullPath)) {
               // leave the entry in the METADATA table; we'll try again
               // later
               removeFlag = false;
               synchronized (SimpleGarbageCollector.this) {
                 ++status.current.errors;
               }
-              log.warn("File exists, but was not deleted for an unknown reason: " + p);
+              log.warn("File exists, but was not deleted for an unknown reason: " + fullPath);
             } else {
               // this failure, we still want to remove the METADATA table
               // entry
@@ -697,7 +702,7 @@ public class SimpleGarbageCollector impl
                 if (tableState != null && tableState != TableState.DELETING) {
                   // clone directories don't always exist
                   if (!tabletDir.startsWith("c-"))
-                    log.warn("File doesn't exist: " + p);
+                    log.warn("File doesn't exist: " + fullPath);
                 }
               } else {
                 log.warn("Very strange path name: " + delete);

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java?rev=1493756&r1=1493755&r2=1493756&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java Mon Jun 17 13:26:43 2013
@@ -113,7 +113,7 @@ public class LogReader {
         }
       } else {
         // read the log entries sorted in a map file
-        MultiReader input = new MultiReader(fs, file);
+        MultiReader input = new MultiReader(fs, path);
         while (input.next(key, value)) {
           printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
         }

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java?rev=1493756&r1=1493755&r2=1493756&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java Mon Jun 17 13:26:43 2013
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -34,10 +35,10 @@ import org.apache.accumulo.core.conf.Pro
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.master.Master;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.ZooCache;
-import org.apache.accumulo.server.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;
@@ -66,38 +67,39 @@ public class RecoveryManager {
   }
 
   private class LogSortTask implements Runnable {
-    private String filename;
-    private String host;
+    private String source;
+    private String destination;
+    private String sortId;
     private LogCloser closer;
     
-    public LogSortTask(LogCloser closer, String host, String filename) {
+    public LogSortTask(LogCloser closer, String source, String destination, String sortId) {
       this.closer = closer;
-      this.host = host;
-      this.filename = filename;
+      this.source = source;
+      this.destination = destination;
+      this.sortId = sortId;
     }
 
     @Override
     public void run() {
       boolean rescheduled = false;
       try {
-        FileSystem localFs = master.getFileSystem();
       
-        long time = closer.close(master, localFs, getSource(host, filename));
+        long time = closer.close(master, master.getFileSystem(), new Path(source));
       
         if (time > 0) {
           executor.schedule(this, time, TimeUnit.MILLISECONDS);
           rescheduled = true;
         } else {
-          initiateSort(host, filename);
+          initiateSort(sortId, source, destination);
         }
       } catch (FileNotFoundException e) {
-        log.debug("Unable to initate log sort for " + filename + ": " + e);
+        log.debug("Unable to initate log sort for " + source + ": " + e);
       } catch (Exception e) {
-        log.warn("Failed to initiate log sort " + filename, e);
+        log.warn("Failed to initiate log sort " + source, e);
       } finally {
         if (!rescheduled) {
           synchronized (RecoveryManager.this) {
-            closeTasksQueued.remove(filename);
+            closeTasksQueued.remove(sortId);
           }
         }
       }
@@ -105,62 +107,62 @@ public class RecoveryManager {
     
   }
   
-  private void initiateSort(String host, final String file) throws KeeperException, InterruptedException {
-    String source = getSource(host, file).toString();
-    new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY).addWork(file, source.getBytes());
+  private void initiateSort(String sortId, String source, final String destination) throws KeeperException, InterruptedException, IOException {
+    String work =  source + "|" + destination; 
+    new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY).addWork(sortId, work.getBytes());
     
     synchronized (this) {
-      sortsQueued.add(file);
+      sortsQueued.add(sortId);
     }
 
-    final String path = ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + file;
-    log.info("Created zookeeper entry " + path + " with data " + source);
+    final String path = ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + sortId;
+    log.info("Created zookeeper entry " + path + " with data " + work);
   }
   
-  private Path getSource(String server, String file) {
-    String source = Constants.getWalDirectory(master.getSystemConfiguration()) + "/" + server + "/" + file;
-    if (server.contains(":")) {
-      // old-style logger log, copied from local file systems by tservers, unsorted into the wal base dir
-      source = Constants.getWalDirectory(master.getSystemConfiguration()) + "/" + file;
-    }
-    return new Path(source);
-  }
+  Random random = new Random();
 
   public boolean recoverLogs(KeyExtent extent, Collection<Collection<String>> walogs) throws IOException {
     boolean recoveryNeeded = false;
     for (Collection<String> logs : walogs) {
       for (String walog : logs) {
-        String parts[] = walog.split("/");
-        String host = parts[0];
-        String filename = parts[1];
+        String hostFilename[] = walog.split("/", 2);
+        String host = hostFilename[0];
+        String filename = hostFilename[1];
+        String parts[] = filename.split("/");
+        String sortId = parts[parts.length - 1];
+        // TODO: ACCUMULO-118: choose recovery directory with extension
+        String[] dirs = ServerConstants.getRecoveryDirs();
+        String recoveryDir = dirs[random.nextInt(dirs.length)];
+        String dest = recoveryDir + "/" + sortId;
+        log.debug("Recovering " + filename + " to " + dest + " using sortId " + sortId);
         
         boolean sortQueued;
         synchronized (this) {
-          sortQueued = sortsQueued.contains(filename);
+          sortQueued = sortsQueued.contains(sortId);
         }
         
-        if (sortQueued && zooCache.get(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + filename) == null) {
+        if (sortQueued && zooCache.get(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + sortId) == null) {
           synchronized (this) {
-            sortsQueued.remove(filename);
+            sortsQueued.remove(sortId);
           }
         }
 
-        if (master.getFileSystem().exists(new Path(Constants.getRecoveryDir(master.getSystemConfiguration()) + "/" + filename + "/finished"))) {
+        if (master.getFileSystem().exists(new Path(dest, "finished"))) {
           synchronized (this) {
-            closeTasksQueued.remove(filename);
-            recoveryDelay.remove(filename);
-            sortsQueued.remove(filename);
+            closeTasksQueued.remove(sortId);
+            recoveryDelay.remove(sortId);
+            sortsQueued.remove(sortId);
           }
           continue;
         }
         
         recoveryNeeded = true;
         synchronized (this) {
-          if (!closeTasksQueued.contains(filename) && !sortsQueued.contains(filename)) {
+          if (!closeTasksQueued.contains(sortId) && !sortsQueued.contains(sortId)) {
             AccumuloConfiguration aconf = master.getConfiguration().getConfiguration();
             LogCloser closer = Master.createInstanceFromPropertyName(aconf, Property.MASTER_WALOG_CLOSER_IMPLEMETATION, LogCloser.class,
                 new HadoopLogCloser());
-            Long delay = recoveryDelay.get(filename);
+            Long delay = recoveryDelay.get(sortId);
             if (delay == null) {
               delay = master.getSystemConfiguration().getTimeInMillis(Property.MASTER_RECOVERY_DELAY);
             } else {
@@ -169,9 +171,9 @@ public class RecoveryManager {
 
             log.info("Starting recovery of " + filename + " (in : " + (delay / 1000) + "s) created for " + host + ", tablet " + extent + " holds a reference");
 
-            executor.schedule(new LogSortTask(closer, host, filename), delay, TimeUnit.MILLISECONDS);
-            closeTasksQueued.add(filename);
-            recoveryDelay.put(filename, delay);
+            executor.schedule(new LogSortTask(closer, filename, dest, sortId), delay, TimeUnit.MILLISECONDS);
+            closeTasksQueued.add(sortId);
+            recoveryDelay.put(sortId, delay);
           }
         }
       }

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java?rev=1493756&r1=1493755&r2=1493756&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java Mon Jun 17 13:26:43 2013
@@ -218,7 +218,7 @@ class WriteExportFiles extends MasterRep
       entry.getValue().write(dataOut);
       
       if (entry.getKey().getColumnFamily().equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY)) {
-        String path = fs.getFullPath(entry.getKey());
+        String path = fs.getFullPath(entry.getKey()).toString();
         String tokens[] = path.split("/");
         if (tokens.length < 1) {
           throw new RuntimeException("Illegal path " + path);

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java?rev=1493756&r1=1493755&r2=1493756&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java Mon Jun 17 13:26:43 2013
@@ -52,6 +52,7 @@ import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.fs.FileSystem;
 import org.apache.accumulo.server.master.Master;
 import org.apache.accumulo.server.master.state.tables.TableManager;
 import org.apache.accumulo.server.security.AuditedSecurityOperation;
@@ -61,7 +62,6 @@ import org.apache.accumulo.server.tablet
 import org.apache.accumulo.server.util.MetadataTable;
 import org.apache.accumulo.server.util.TablePropUtil;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.accumulo.server.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
@@ -314,7 +314,7 @@ class MapImportFileNames extends MasterR
       FileSystem fs = environment.getFileSystem();
       
       fs.mkdirs(new Path(tableInfo.importDir));
-      
+
       FileStatus[] files = fs.listStatus(new Path(tableInfo.exportDir));
       
       UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
@@ -323,7 +323,7 @@ class MapImportFileNames extends MasterR
       
       for (FileStatus fileStatus : files) {
         String fileName = fileStatus.getPath().getName();
-        
+        log.info("filename " + fileStatus.getPath().toString());
         String sa[] = fileName.split("\\.");
         String extension = "";
         if (sa.length > 1) {

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java?rev=1493756&r1=1493755&r2=1493756&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java Mon Jun 17 13:26:43 2013
@@ -16,23 +16,27 @@
  */
 package org.apache.accumulo.server.master.tableOps;
 
+import java.util.Map.Entry;
+
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.util.TextUtil;
 import org.apache.accumulo.fate.Repo;
+import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.master.Master;
 import org.apache.accumulo.server.master.state.MergeInfo;
 import org.apache.accumulo.server.master.state.MergeInfo.Operation;
 import org.apache.accumulo.server.master.state.MergeState;
 import org.apache.accumulo.server.util.MetadataTable;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 
 /**
@@ -57,13 +61,14 @@ class MakeDeleteEntries extends MasterRe
   public Repo<Master> call(long tid, Master master) throws Exception {
     log.info("creating delete entries for merged metadata tablets");
     Connector conn = master.getConnector();
+    Scanner scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
+    scanner.setRange(Constants.METADATA_ROOT_TABLET_KEYSPACE);
+    scanner.fetchColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY);
     BatchWriter bw = conn.createBatchWriter(Constants.METADATA_TABLE_NAME, new BatchWriterConfig());
-    String tableDir = Constants.getMetadataTableDir(master.getConfiguration().getConfiguration());
-    for (FileStatus fs : master.getFileSystem().listStatus(new Path(tableDir))) {
+    for (Entry<Key,Value> entry : scanner) {
       // TODO: add the entries only if there are no !METADATA table references - ACCUMULO-1308
-      if (fs.isDir() && fs.getPath().getName().matches("^" + Constants.GENERATED_TABLET_DIRECTORY_PREFIX + ".*")) {
-        bw.addMutation(MetadataTable.createDeleteMutation(Constants.METADATA_TABLE_ID, "/" + fs.getPath().getName()));
-      }
+      FileRef ref = new FileRef(master.getFileSystem(), entry.getKey());
+      bw.addMutation(MetadataTable.createDeleteMutation(Constants.METADATA_TABLE_ID, ref.path().toString()));
     }
     bw.close();
     return null;

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java?rev=1493756&r1=1493755&r2=1493756&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java Mon Jun 17 13:26:43 2013
@@ -51,6 +51,7 @@ import org.apache.accumulo.server.proble
 import org.apache.accumulo.server.problems.ProblemReports;
 import org.apache.accumulo.server.problems.ProblemType;
 import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 
@@ -305,10 +306,10 @@ public class FileManager {
     // open any files that need to be opened
     for (String file : filesToOpen) {
       try {
-        // log.debug("Opening "+file);
-        String path = fs.getFullPath(ServerConstants.getTablesDirs(), file);
+        Path path = fs.getFullPath(ServerConstants.getTablesDirs(), file);
         org.apache.hadoop.fs.FileSystem ns = fs.getFileSystemByPath(path);
-        FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, ns, ns.getConf(), conf.getTableConfiguration(table.toString()),
+        //log.debug("Opening "+file + " path " + path);
+        FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), false, ns, ns.getConf(), conf.getTableConfiguration(table.toString()),
             dataCache, indexCache);
         reservedFiles.add(reader);
         readersReserved.put(reader, file);

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java?rev=1493756&r1=1493755&r2=1493756&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java Mon Jun 17 13:26:43 2013
@@ -771,7 +771,7 @@ public class Tablet {
       mergingMinorCompactionFile = null;
     }
     
-    void bringMinorCompactionOnline(FileRef tmpDatafile, FileRef newDatafile, FileRef absMergeFile, DataFileValue dfv, CommitSession commitSession, long flushId) {
+    void bringMinorCompactionOnline(FileRef tmpDatafile, FileRef newDatafile, FileRef absMergeFile, DataFileValue dfv, CommitSession commitSession, long flushId) throws IOException {
       
       IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
       if (extent.isRootTablet()) {
@@ -1137,7 +1137,7 @@ public class Tablet {
           break;
         }
         
-        FileRef ref = new FileRef(entry.getKey().getColumnQualifier().toString(), new Path(fs.getFullPath(entry.getKey())));
+        FileRef ref = new FileRef(entry.getKey().getColumnQualifier().toString(), fs.getFullPath(entry.getKey()));
         datafiles.put(ref, new DataFileValue(entry.getValue().get()));
       }
     }
@@ -1178,8 +1178,8 @@ public class Tablet {
       Key key = entry.getKey();
       if (key.getRow().equals(row) && key.getColumnFamily().equals(Constants.METADATA_SCANFILE_COLUMN_FAMILY)) {
         String meta = key.getColumnQualifier().toString();
-        String path = fs.getFullPath(ServerConstants.getTablesDirs(), meta);
-        scanFiles.add(new FileRef(meta, new Path(path)));
+        Path path = fs.getFullPath(ServerConstants.getTablesDirs(), meta);
+        scanFiles.add(new FileRef(meta, path));
       }
     }
     
@@ -1376,7 +1376,8 @@ public class Tablet {
       for (LogEntry logEntry : logEntries) {
         for (String log : logEntry.logSet) {
           String[] parts = log.split("/", 2);
-          currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), logEntry.server, parts[1]));
+          Path file = fs.getFullPath(ServerConstants.getWalDirs(), parts[1]);
+          currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), logEntry.server, file));
         }
       }
       
@@ -2061,9 +2062,9 @@ public class Tablet {
           commitSession, flushId);
       span.stop();
       return new DataFileValue(stats.getFileSize(), stats.getEntriesWritten());
-    } catch (RuntimeException E) {
+    } catch (Exception E) {
       failed = true;
-      throw E;
+      throw new RuntimeException(E);
     } catch (Error E) {
       // Weird errors like "OutOfMemoryError" when trying to create the thread for the compaction
       failed = true;

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1493756&r1=1493755&r2=1493756&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Mon Jun 17 13:26:43 2013
@@ -2054,10 +2054,10 @@ public class TabletServer extends Abstra
     public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException {
       String myname = getClientAddressString();
       myname = myname.replace(':', '+');
-      Path logDir = new Path(Constants.getWalDirectory(acuConf), myname);
       Set<String> loggers = new HashSet<String>();
       logger.getLoggers(loggers);
       nextFile: for (String filename : filenames) {
+        // skip any log we're currently using
         for (String logger : loggers) {
           if (logger.contains(filename))
             continue nextFile;
@@ -2074,28 +2074,30 @@ public class TabletServer extends Abstra
             }
           }
         }
+        
         try {
-          String source = logDir + "/" + filename;
+          Path source = new Path(filename);
           if (acuConf.getBoolean(Property.TSERV_ARCHIVE_WALOGS)) {
-            String walogArchive = Constants.getBaseDir(acuConf) + "/walogArchive";
-            fs.mkdirs(new Path(walogArchive));
-            String dest = walogArchive + "/" + filename;
+            Path walogArchive = fs.matchingFileSystem(source, ServerConstants.getWalogArchives());
+            fs.mkdirs(walogArchive);
+            Path dest = new Path(walogArchive, source.getName());
             log.info("Archiving walog " + source + " to " + dest);
-            if (!fs.rename(new Path(source), new Path(dest)))
+            if (!fs.rename(source, dest))
               log.error("rename is unsuccessful");
           } else {
             log.info("Deleting walog " + filename);
-            Path sourcePath = new Path(source);
+            Path sourcePath = new Path(filename);
             if (!fs.moveToTrash(sourcePath) && !fs.deleteRecursively(sourcePath))
               log.warn("Failed to delete walog " + source);
-            Path recoveryPath = new Path(Constants.getRecoveryDir(acuConf), filename);
-            try {
-              if (fs.moveToTrash(recoveryPath) || fs.deleteRecursively(recoveryPath))
-                log.info("Deleted any recovery log " + filename);
-            } catch (FileNotFoundException ex) {
-              // ignore
+            for (String recovery : ServerConstants.getRecoveryDirs()) {
+              Path recoveryPath = new Path(recovery, source.getName());
+              try {
+                if (fs.moveToTrash(recoveryPath) || fs.deleteRecursively(recoveryPath))
+                  log.info("Deleted any recovery log " + filename);
+              } catch (FileNotFoundException ex) {
+                // ignore
+              }
             }
-            
           }
         } catch (IOException e) {
           log.warn("Error attempting to delete write-ahead log " + filename + ": " + e);
@@ -3243,7 +3245,7 @@ public class TabletServer extends Abstra
   }
   
   public void recover(FileSystem fs, Tablet tablet, List<LogEntry> logEntries, Set<String> tabletFiles, MutationReceiver mutationReceiver) throws IOException {
-    List<String> recoveryLogs = new ArrayList<String>();
+    List<Path> recoveryLogs = new ArrayList<Path>();
     List<LogEntry> sorted = new ArrayList<LogEntry>(logEntries);
     Collections.sort(sorted, new Comparator<LogEntry>() {
       @Override
@@ -3252,14 +3254,13 @@ public class TabletServer extends Abstra
       }
     });
     for (LogEntry entry : sorted) {
-      String recovery = null;
+      Path recovery = null;
       for (String log : entry.logSet) {
         String[] parts = log.split("/"); // "host:port/filename"
-        log = fs.getFullPath(ServerConstants.getRecoveryDirs(), parts[1]);
-        Path finished = new Path(log + "/finished");
+        Path finished = new Path(fs.getFullPath(ServerConstants.getRecoveryDirs(), parts[parts.length - 1]), "finished");
         TabletServer.log.info("Looking for " + finished);
         if (fs.exists(finished)) {
-          recovery = log;
+          recovery = finished.getParent();
           break;
         }
       }

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java?rev=1493756&r1=1493755&r2=1493756&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java Mon Jun 17 13:26:43 2013
@@ -31,18 +31,19 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.StringUtil;
+import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.fs.FileSystem;
 import org.apache.accumulo.server.logger.LogFileKey;
 import org.apache.accumulo.server.logger.LogFileValue;
@@ -211,10 +212,10 @@ public class DfsLogger {
     this.conf = conf;
   }
   
-  public DfsLogger(ServerResources conf, String logger, String filename) throws IOException {
+  public DfsLogger(ServerResources conf, String logger, Path filename) throws IOException {
     this.conf = conf;
     this.logger = logger;
-    this.logPath = new Path(Constants.getWalDirectory(conf.getConfiguration()), filename);
+    this.logPath = filename;
   }
   
   public static FSDataInputStream readHeader(FileSystem fs, Path path, Map<String,String> opts) throws IOException {
@@ -241,13 +242,17 @@ public class DfsLogger {
     }
   }
   
+  // TODO: ACCUMULO-118
+  static final Random random = new Random();
+  
   public synchronized void open(String address) throws IOException {
     String filename = UUID.randomUUID().toString();
     logger = StringUtil.join(Arrays.asList(address.split(":")), "+");
     
     log.debug("DfsLogger.open() begin");
+    String[] wals = ServerConstants.getWalDirs();
     
-    logPath = new Path(Constants.getWalDirectory(conf.getConfiguration()) + "/" + logger + "/" + filename);
+    logPath = new Path(wals[random.nextInt(wals.length)] + "/" + logger + "/" + filename);
     try {
       FileSystem fs = conf.getFileSystem();
       short replication = (short) conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION);
@@ -334,7 +339,7 @@ public class DfsLogger {
   }
   
   public String getFileName() {
-    return logPath.getName();
+    return logPath.toString();
   }
   
   public void close() throws IOException {

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java?rev=1493756&r1=1493755&r2=1493756&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java Mon Jun 17 13:26:43 2013
@@ -74,21 +74,24 @@ public class LogSorter {
     
     @Override
     public void process(String child, byte[] data) {
-      String dest = Constants.getRecoveryDir(conf) + "/" + child;
-      String src = new String(data);
-      String name = new Path(src).getName();
+      String work = new String(data);
+      String[] parts = work.split("\\|");
+      String src = parts[0];
+      String dest = parts[1];
+      String sortId = new Path(src).getName();
+      log.debug("Sorting " + src + " to " + dest + " using sortId " + sortId);
       
       synchronized (currentWork) {
-        if (currentWork.containsKey(name))
+        if (currentWork.containsKey(sortId))
           return;
-        currentWork.put(name, this);
+        currentWork.put(sortId, this);
       }
       
       try {
         log.info("Copying " + src + " to " + dest);
-        sort(name, new Path(src), dest);
+        sort(sortId, new Path(src), dest);
       } finally {
-        currentWork.remove(name);
+        currentWork.remove(sortId);
       }
       
     }

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java?rev=1493756&r1=1493755&r2=1493756&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java Mon Jun 17 13:26:43 2013
@@ -86,9 +86,9 @@ public class MultiReader {
   
   private PriorityBuffer heap = new PriorityBuffer();
   
-  public MultiReader(FileSystem fs, String directory) throws IOException {
+  public MultiReader(FileSystem fs, Path directory) throws IOException {
     boolean foundFinish = false;
-    for (FileStatus child : fs.listStatus(new Path(directory))) {
+    for (FileStatus child : fs.listStatus(directory)) {
       if (child.getPath().getName().startsWith("_"))
         continue;
       if (child.getPath().getName().equals("finished")) {

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java?rev=1493756&r1=1493755&r2=1493756&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java Mon Jun 17 13:26:43 2013
@@ -32,6 +32,7 @@ import org.apache.accumulo.core.data.Mut
 import org.apache.accumulo.server.fs.FileSystem;
 import org.apache.accumulo.server.logger.LogFileKey;
 import org.apache.accumulo.server.logger.LogFileValue;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 
 /**
@@ -88,11 +89,11 @@ public class SortedLogRecovery {
     }
   }
   
-  public void recover(KeyExtent extent, List<String> recoveryLogs, Set<String> tabletFiles, MutationReceiver mr) throws IOException {
+  public void recover(KeyExtent extent, List<Path> recoveryLogs, Set<String> tabletFiles, MutationReceiver mr) throws IOException {
     int[] tids = new int[recoveryLogs.size()];
     LastStartToFinish lastStartToFinish = new LastStartToFinish();
     for (int i = 0; i < recoveryLogs.size(); i++) {
-      String logfile = recoveryLogs.get(i);
+      Path logfile = recoveryLogs.get(i);
       log.info("Looking at mutations from " + logfile + " for " + extent);
       MultiReader reader = new MultiReader(fs, logfile);
       try {
@@ -119,7 +120,7 @@ public class SortedLogRecovery {
       throw new RuntimeException("COMPACTION_FINISH (without preceding COMPACTION_START) not followed by successful minor compaction");
     
     for (int i = 0; i < recoveryLogs.size(); i++) {
-      String logfile = recoveryLogs.get(i);
+      Path logfile = recoveryLogs.get(i);
       MultiReader reader = new MultiReader(fs, logfile);
       try {
         playbackMutations(reader, tids[i], lastStartToFinish, mr);
@@ -188,6 +189,7 @@ public class SortedLogRecovery {
         lastStartToFinish.update(fileno, key.seq);
         
         // Tablet server finished the minor compaction, but didn't remove the entry from the METADATA table.
+        log.error("filename in compaction start " + key.filename);
         if (tabletFiles.contains(key.filename))
           lastStartToFinish.update(-1);
       } else if (key.event == COMPACTION_FINISH) {

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java?rev=1493756&r1=1493755&r2=1493756&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java Mon Jun 17 13:26:43 2013
@@ -40,6 +40,7 @@ import org.apache.accumulo.server.tablet
 import org.apache.accumulo.server.tabletserver.TabletMutations;
 import org.apache.accumulo.server.tabletserver.TabletServer;
 import org.apache.accumulo.server.tabletserver.log.DfsLogger.LoggerOperation;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 
 /**
@@ -414,7 +415,7 @@ public class TabletServerLogger {
     return seq;
   }
   
-  public void recover(FileSystem fs, Tablet tablet, List<String> logs, Set<String> tabletFiles, MutationReceiver mr) throws IOException {
+  public void recover(FileSystem fs, Tablet tablet, List<Path> logs, Set<String> tabletFiles, MutationReceiver mr) throws IOException {
     if (!enabled(tablet))
       return;
     try {

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/Initialize.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/Initialize.java?rev=1493756&r1=1493755&r2=1493756&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/Initialize.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/Initialize.java Mon Jun 17 13:26:43 2013
@@ -503,6 +503,7 @@ public class Initialize {
       SecurityUtil.serverLogin();
       Configuration conf = CachedConfiguration.getInstance();
       
+      @SuppressWarnings("deprecation")
       FileSystem fs = FileSystemImpl.get(SiteConfiguration.getSiteConfiguration());
       
       if (opts.resetSecurity) {

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java?rev=1493756&r1=1493755&r2=1493756&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java Mon Jun 17 13:26:43 2013
@@ -61,7 +61,7 @@ public class LocalityCheck {
         files.clear();
       } else if (key.compareColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY) == 0) {
         
-        files.add(fs.getFullPath(key));
+        files.add(fs.getFullPath(key).toString());
       }
     }
     System.out.println(" Server         %local  total blocks");

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java?rev=1493756&r1=1493755&r2=1493756&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java Mon Jun 17 13:26:43 2013
@@ -161,7 +161,7 @@ public class MetadataTable extends org.a
         boolean foundEntry = false;
         for (String entry : unusedWalLogs) {
           String[] parts = entry.split("/");
-          String zpath = root + "/" + parts[1];
+          String zpath = root + "/" + parts[parts.length - 1];
           while (true) {
             try {
               if (zk.exists(zpath)) {
@@ -427,12 +427,12 @@ public class MetadataTable extends org.a
   }
   
   public static void replaceDatafiles(KeyExtent extent, Set<FileRef> datafilesToDelete, Set<FileRef> scanFiles, FileRef path, Long compactionId,
-      DataFileValue size, TCredentials credentials, String address, TServerInstance lastLocation, ZooLock zooLock) {
+      DataFileValue size, TCredentials credentials, String address, TServerInstance lastLocation, ZooLock zooLock) throws IOException {
     replaceDatafiles(extent, datafilesToDelete, scanFiles, path, compactionId, size, credentials, address, lastLocation, zooLock, true);
   }
   
   public static void replaceDatafiles(KeyExtent extent, Set<FileRef> datafilesToDelete, Set<FileRef> scanFiles, FileRef path, Long compactionId,
-      DataFileValue size, TCredentials credentials, String address, TServerInstance lastLocation, ZooLock zooLock, boolean insertDeleteFlags) {
+      DataFileValue size, TCredentials credentials, String address, TServerInstance lastLocation, ZooLock zooLock, boolean insertDeleteFlags) throws IOException {
     
     if (insertDeleteFlags) {
       // add delete flags for those paths before the data file reference is removed
@@ -464,30 +464,32 @@ public class MetadataTable extends org.a
     update(credentials, zooLock, m);
   }
   
-  public static void addDeleteEntries(KeyExtent extent, Set<FileRef> datafilesToDelete, TCredentials credentials) {
+  public static void addDeleteEntries(KeyExtent extent, Set<FileRef> datafilesToDelete, TCredentials credentials) throws IOException {
     
     String tableId = extent.getTableId().toString();
     
     // TODO could use batch writer,would need to handle failure and retry like update does - ACCUMULO-1294
-    for (FileRef pathToRemove : datafilesToDelete)
-      update(credentials, createDeleteMutation(tableId, pathToRemove.meta().toString()));
+    for (FileRef pathToRemove : datafilesToDelete) {
+      update(credentials, createDeleteMutation(tableId, pathToRemove.path().toString()));
+    }
   }
   
-  public static void addDeleteEntry(String tableId, String path) {
+  public static void addDeleteEntry(String tableId, String path) throws IOException {
     update(SecurityConstants.getSystemCredentials(), createDeleteMutation(tableId, path));
   }
   
-  public static Mutation createDeleteMutation(String tableId, String pathToRemove) {
-    Mutation delFlag;
+  public static Mutation createDeleteMutation(String tableId, String pathToRemove) throws IOException {
     String prefix = Constants.METADATA_DELETE_FLAG_PREFIX;
     if (tableId.equals(Constants.METADATA_TABLE_ID))
       prefix = Constants.METADATA_DELETE_FLAG_FOR_METADATA_PREFIX;
 
     if (pathToRemove.startsWith("../"))
-      delFlag = new Mutation(new Text(prefix + pathToRemove.substring(2)));
+      pathToRemove = pathToRemove.substring(2);
     else
-      delFlag = new Mutation(new Text(prefix + "/" + tableId + pathToRemove));
+      pathToRemove = "/" + tableId + "/" + pathToRemove;
 
+    Path path = FileSystemImpl.get().getFullPath(ServerConstants.getTablesDirs(), pathToRemove);
+    Mutation delFlag = new Mutation(new Text(prefix + path.toString()));
     delFlag.put(EMPTY_TEXT, EMPTY_TEXT, new Value(new byte[] {}));
     return delFlag;
   }
@@ -634,7 +636,7 @@ public class MetadataTable extends org.a
     return fixSplit(table, metadataEntry, metadataPrevEndRow, oper, splitRatio, tserver, credentials, time.toString(), initFlushID, initCompactID, lock);
   }
   
-  public static void deleteTable(String tableId, boolean insertDeletes, TCredentials credentials, ZooLock lock) throws AccumuloException {
+  public static void deleteTable(String tableId, boolean insertDeletes, TCredentials credentials, ZooLock lock) throws AccumuloException, IOException {
     Scanner ms = new ScannerImpl(HdfsZooInstance.getInstance(), credentials, Constants.METADATA_TABLE_ID, Constants.NO_AUTHS);
     Text tableIdText = new Text(tableId);
     BatchWriter bw = new BatchWriterImpl(HdfsZooInstance.getInstance(), credentials, Constants.METADATA_TABLE_ID, new BatchWriterConfig().setMaxMemory(1000000)
@@ -654,10 +656,8 @@ public class MetadataTable extends org.a
         Key key = cell.getKey();
         
         if (key.getColumnFamily().equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY)) {
-          String relPath = key.getColumnQualifier().toString();
-          // only insert deletes for files owned by this table
-          if (!relPath.startsWith("../"))
-            bw.addMutation(createDeleteMutation(tableId, relPath));
+          FileRef ref = new FileRef(FileSystemImpl.get(), key);
+          bw.addMutation(createDeleteMutation(tableId, ref.meta().toString()));
         }
         
         if (Constants.METADATA_DIRECTORY_COLUMN.hasColumns(key)) {
@@ -716,7 +716,7 @@ public class MetadataTable extends org.a
       extent.write(out);
       out.writeLong(timestamp);
       out.writeUTF(server);
-      out.writeUTF(filename);
+      out.writeUTF(filename.toString());
       out.write(tabletId);
       out.write(logSet.size());
       for (String s : logSet) {
@@ -753,8 +753,11 @@ public class MetadataTable extends org.a
       while (true) {
         try {
           IZooReaderWriter zoo = ZooReaderWriter.getInstance();
-          if (zoo.isLockHeld(zooLock.getLockID()))
-            zoo.putPersistentData(root + "/" + entry.filename, entry.toBytes(), NodeExistsPolicy.OVERWRITE);
+          if (zoo.isLockHeld(zooLock.getLockID())) {
+            String[] parts = entry.filename.split("/");
+            String uniqueId = parts[parts.length - 1];
+            zoo.putPersistentData(root + "/" + uniqueId, entry.toBytes(), NodeExistsPolicy.OVERWRITE);
+          }
           break;
         } catch (KeeperException e) {
           log.error(e, e);
@@ -776,7 +779,7 @@ public class MetadataTable extends org.a
   public static LogEntry entryFromKeyValue(Key key, Value value) {
     MetadataTable.LogEntry e = new MetadataTable.LogEntry();
     e.extent = new KeyExtent(key.getRow(), EMPTY_TEXT);
-    String[] parts = key.getColumnQualifier().toString().split("/");
+    String[] parts = key.getColumnQualifier().toString().split("/", 2);
     e.server = parts[0];
     e.filename = parts[1];
     parts = value.toString().split("\\|");
@@ -905,6 +908,8 @@ public class MetadataTable extends org.a
       try {
         Scanner scanner = HdfsZooInstance.getInstance().getConnector(creds.getPrincipal(), CredentialHelper.extractToken(creds))
             .createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
+        log.info("Setting range to " + Constants.NON_ROOT_METADATA_KEYSPACE);
+        scanner.setRange(Constants.NON_ROOT_METADATA_KEYSPACE);
         scanner.fetchColumnFamily(Constants.METADATA_LOG_COLUMN_FAMILY);
         metadataEntries = scanner.iterator();
       } catch (Exception ex) {
@@ -923,6 +928,7 @@ public class MetadataTable extends org.a
         return rootTabletEntries.next();
       }
       Entry<Key,Value> entry = metadataEntries.next();
+      log.info("entry " + entry + " in range " + Constants.NON_ROOT_METADATA_KEYSPACE.contains(entry.getKey()));
       return entryFromKeyValue(entry.getKey(), entry.getValue());
     }
     

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java?rev=1493756&r1=1493755&r2=1493756&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java Mon Jun 17 13:26:43 2013
@@ -151,7 +151,7 @@ public class OfflineMetadataScanner exte
     
     while (ssi.hasTop()) {
       if (ssi.getTopKey().compareColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY) == 0) {
-        allFiles.add(fs.getFullPath(ssi.getTopKey()));
+        allFiles.add(fs.getFullPath(ssi.getTopKey()).toString());
       } else {
         walogs++;
       }



Mime
View raw message