accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject svn commit: r1489969 [1/4] - in /accumulo/branches/ACCUMULO-118: core/src/main/java/org/apache/accumulo/core/util/ proxy/src/test/java/org/apache/accumulo/proxy/ server/src/main/java/org/apache/accumulo/server/ server/src/main/java/org/apache/accumulo/...
Date Wed, 05 Jun 2013 17:19:26 GMT
Author: ecn
Date: Wed Jun  5 17:19:25 2013
New Revision: 1489969

URL: http://svn.apache.org/r1489969
Log:
ACCUMULO-118 support multiple namespaces for tables

Added:
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileRef.java   (with props)
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/FileUtil.java   (with props)
Removed:
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/CountRowKeys.java
Modified:
    accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/util/TableDiskUsage.java
    accumulo/branches/ACCUMULO-118/proxy/src/test/java/org/apache/accumulo/proxy/SimpleTest.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/ServerConstants.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystem.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystemImpl.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/Master.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletIteratorEnvironment.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/CheckForMetadataProblems.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/Initialize.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/TabletOperations.java
    accumulo/branches/ACCUMULO-118/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousStatsCollector.java
    accumulo/branches/ACCUMULO-118/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryTest.java
    accumulo/branches/ACCUMULO-118/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
    accumulo/branches/ACCUMULO-118/test/src/test/java/org/apache/accumulo/test/ShellServerTest.java

Modified: accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/util/TableDiskUsage.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/util/TableDiskUsage.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/util/TableDiskUsage.java (original)
+++ accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/util/TableDiskUsage.java Wed Jun  5 17:19:25 2013
@@ -81,7 +81,7 @@ public class TableDiskUsage {
   }
   
   Map<List<String>,Long> calculateUsage() {
-    
+
     Map<List<Integer>,Long> usage = new HashMap<List<Integer>,Long>();
     
     for (Entry<String,Integer[]> entry : tableFiles.entrySet()) {
@@ -138,6 +138,8 @@ public class TableDiskUsage {
     HashSet<String> tablesReferenced = new HashSet<String>(tableIds);
     HashSet<String> emptyTableIds = new HashSet<String>();
     
+    final String TABLES = Constants.getTablesDir(acuConf);
+
     for (String tableId : tableIds) {
       Scanner mdScanner = null;
       try {
@@ -152,20 +154,24 @@ public class TableDiskUsage {
         emptyTableIds.add(tableId);
       }
       
+      // TODO ACCUMULO-118 there are multiple table locations/filesystems
       for (Entry<Key,Value> entry : mdScanner) {
         String file = entry.getKey().getColumnQualifier().toString();
-        if (file.startsWith("../")) {
+        if (file.contains(":")) {
+          file = file.substring(file.indexOf(TABLES) + TABLES.length());
+        } else if (file.startsWith("../")) {
           file = file.substring(2);
           tablesReferenced.add(file.split("\\/")[1]);
-        } else
+        } else {
           file = "/" + tableId + file;
+        }
         
         tdu.linkFileAndTable(tableId, file);
       }
     }
     
     for (String tableId : tablesReferenced) {
-      FileStatus[] files = fs.globStatus(new Path(Constants.getTablesDir(acuConf) + "/" + tableId + "/*/*"));
+      FileStatus[] files = fs.globStatus(new Path(TABLES + "/" + tableId + "/*/*"));
       
       for (FileStatus fileStatus : files) {
         String dir = fileStatus.getPath().getParent().getName();

Modified: accumulo/branches/ACCUMULO-118/proxy/src/test/java/org/apache/accumulo/proxy/SimpleTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/proxy/src/test/java/org/apache/accumulo/proxy/SimpleTest.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/proxy/src/test/java/org/apache/accumulo/proxy/SimpleTest.java (original)
+++ accumulo/branches/ACCUMULO-118/proxy/src/test/java/org/apache/accumulo/proxy/SimpleTest.java Wed Jun  5 17:19:25 2013
@@ -982,7 +982,7 @@ public class SimpleTest {
     assertEquals("10", new String(more.getResults().get(0).getValue()));
     try {
       client.checkIteratorConflicts(creds, TABLE_TEST, setting, EnumSet.allOf(IteratorScope.class));
-      fail("checkIteratorConflicts did not throw and exception");
+      fail("checkIteratorConflicts did not throw an exception");
     } catch (Exception ex) {}
     client.deleteRows(creds, TABLE_TEST, null, null);
     client.removeIterator(creds, TABLE_TEST, "test", EnumSet.allOf(IteratorScope.class));
@@ -1141,6 +1141,6 @@ public class SimpleTest {
   @AfterClass
   public static void tearDownMiniCluster() throws Exception {
     accumulo.stop();
-    folder.delete();
+    //folder.delete();
   }
 }

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/ServerConstants.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/ServerConstants.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/ServerConstants.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/ServerConstants.java Wed Jun  5 17:19:25 2013
@@ -24,32 +24,49 @@ import static org.apache.accumulo.core.C
 
 public class ServerConstants {
   // these are functions to delay loading the Accumulo configuration unless we must
-  public static String getBaseDir() {
-    return ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_DIR);
+  public static String[] getBaseDirs() {
+    String singleNamespace = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_DIR);
+    String ns = ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_NAMESPACES);
+    if (ns == null) {
+      return new String[] { singleNamespace };
+    }
+    String namespaces[] = ns.split(",");
+    if (namespaces.length < 2) {
+      return new String[] { singleNamespace };
+    }
+    return prefix(namespaces, singleNamespace);
+  }
+  
+  public static String[] prefix(String bases[], String suffix) {
+    String result[] = new String[bases.length];
+    for (int i = 0; i < bases.length; i++) {
+      result[i] = bases[i] + "/" + suffix;
+    }
+    return result;
   }
   
-  public static String getTablesDir() {
-    return getBaseDir() + "/tables";
+  public static String[] getTablesDirs() {
+    return prefix(getBaseDirs(), "tables");
   }
   
-  public static String getRecoveryDir() {
-    return getBaseDir() + "/recovery";
+  public static String[] getRecoveryDirs() {
+    return prefix(getBaseDirs(), "recovery");
   }
   
   public static Path getInstanceIdLocation() {
-    return new Path(getBaseDir() + "/instance_id");
+    return new Path(ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_DIR) + "/instance_id");
   }
   
   public static Path getDataVersionLocation() {
-    return new Path(getBaseDir() + "/version");
+    return new Path(ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_DFS_DIR) + "/version");
   }
   
-  public static String getMetadataTableDir() {
-    return getTablesDir() + "/" + METADATA_TABLE_ID;
+  public static String[] getMetadataTableDirs() {
+    return prefix(getTablesDirs(), METADATA_TABLE_ID);
   }
   
   public static String getRootTabletDir() {
-    return getMetadataTableDir() + ZROOT_TABLET;
+    return prefix(getMetadataTableDirs(), ZROOT_TABLET)[0];
   }
   
 }

Added: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileRef.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileRef.java?rev=1489969&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileRef.java (added)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileRef.java Wed Jun  5 17:19:25 2013
@@ -0,0 +1,63 @@
+package org.apache.accumulo.server.fs;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+
+
+/**
+ * This is a glue object, to convert short file references to long references.
+ * The !METADATA table may contain old relative file references.  This class keeps 
+ * track of the short file reference, so it can be removed properly from the !METADATA table.
+ */
+public class FileRef implements Comparable<FileRef> {
+  String metaReference;  // something like ../2/d-00000/A00001.rf
+  Path fullReference;  // something like hdfs://nn:9001/accumulo/tables/2/d-00000/A00001.rf
+  
+  public FileRef(FileSystem fs, Key key) {
+    metaReference = key.getColumnQualifier().toString();
+    fullReference = new Path(fs.getFullPath(key));
+  }
+  
+  public FileRef(String metaReference, Path fullReference) {
+    this.metaReference = metaReference;
+    this.fullReference = fullReference;
+  }
+  
+  public FileRef(String path) {
+    this.metaReference = path;
+    this.fullReference = new Path(path);
+  }
+  
+  public String toString() {
+    return fullReference.toString();
+  }
+  
+  public Path path() {
+    return fullReference;
+  }
+  
+  public Text meta() {
+    return new Text(metaReference);
+  }
+
+  @Override
+  public int compareTo(FileRef o) {
+    return path().compareTo(o.path());
+  }
+
+  @Override
+  public int hashCode() {
+    return path().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof FileRef) {
+      return compareTo((FileRef)obj) == 0;
+    }
+    return false;
+  }
+  
+  
+}

Propchange: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileRef.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystem.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystem.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystem.java Wed Jun  5 17:19:25 2013
@@ -1,64 +1,103 @@
 package org.apache.accumulo.server.fs;
 
 import java.io.IOException;
-import java.util.Collection;
+import java.util.Map;
 
 import org.apache.accumulo.core.data.Key;
+import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
+/**
+ * A wrapper around multiple hadoop FileSystem objects, which are assumed to be different namespaces.
+ */
 public interface FileSystem {
   
+  // close the underlying FileSystems
   void close() throws IOException;
   
+  // the mechanism by which the master ensures that tablet servers can no longer write to a WAL
   boolean closePossiblyOpenFile(Path path) throws IOException;
   
+  // forward to the appropriate FileSystem object
   FSDataOutputStream create(Path dest) throws IOException;
   
+  // forward to the appropriate FileSystem object
   FSDataOutputStream create(Path path, boolean b) throws IOException;
   
+  // forward to the appropriate FileSystem object
   FSDataOutputStream create(Path path, boolean b, int int1, short int2, long long1) throws IOException;
   
+  // create a file, but only if it doesn't exist
   boolean createNewFile(Path writable) throws IOException;
   
+  // create a file which can be sync'd to disk
   FSDataOutputStream createSyncable(Path logPath, int buffersize, short replication, long blockSize) throws IOException;
   
+  // delete a file
   boolean delete(Path path) throws IOException;
   
+  // delete a directory and anything under it
   boolean deleteRecursively(Path path) throws IOException;
   
-  boolean exists(Path newBulkDir) throws IOException;
+  // forward to the appropriate FileSystem object
+  boolean exists(Path path) throws IOException;
   
-  FileStatus getFileStatus(Path errorPath) throws IOException;
+  // forward to the appropriate FileSystem object
+  FileStatus getFileStatus(Path path) throws IOException;
   
+  // find the appropriate FileSystem object given a path
   org.apache.hadoop.fs.FileSystem getFileSystemByPath(Path path);
   
   org.apache.hadoop.fs.FileSystem getFileSystemByPath(String path);
   
-  Collection<org.apache.hadoop.fs.FileSystem> getFileSystems();
+  // get a mapping of namespace to FileSystem
+  Map<String, ? extends org.apache.hadoop.fs.FileSystem> getFileSystems();
   
+  // return the item in options that is in the same namespace as source
+  Path matchingFileSystem(Path source, String[] options);
+  
+  // create a new path in the same namespace as the sourceDir
+  String newPathOnSameNamespace(String sourceDir, String suffix);
+  
+  // forward to the appropriate FileSystem object
   FileStatus[] listStatus(Path path) throws IOException;
   
+  // forward to the appropriate FileSystem object
   boolean mkdirs(Path directory) throws IOException;
   
+  // forward to the appropriate FileSystem object
   FSDataInputStream open(Path path) throws IOException;
   
+  // forward to the appropriate FileSystem object, throws an exception if the paths are in different namespaces
   boolean rename(Path path, Path newPath) throws IOException;
   
+  // forward to the appropriate FileSystem object
   boolean moveToTrash(Path sourcePath) throws IOException;
   
+  // forward to the appropriate FileSystem object
   short getDefaultReplication(Path logPath);
   
+  // forward to the appropriate FileSystem object
   boolean isFile(Path path) throws IOException;
   
+  // all namespaces are ready to provide service (not in SafeMode, for example)
   boolean isReady() throws IOException;
   
+  // ambiguous references to files go here
   org.apache.hadoop.fs.FileSystem getDefaultNamespace();
   
+  // forward to the appropriate FileSystem object
   FileStatus[] globStatus(Path path) throws IOException;
 
+  // Convert a file or directory !METADATA reference into a path
   String getFullPath(Key key);
   
+  // Given a filename, figure out the qualified path given multiple namespaces
+  String getFullPath(String paths[], String fileName) throws IOException;
+
+  // forward to the appropriate FileSystem object
+  ContentSummary getContentSummary(String dir);
 }

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystemImpl.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystemImpl.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystemImpl.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystemImpl.java Wed Jun  5 17:19:25 2013
@@ -4,7 +4,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
@@ -20,6 +20,7 @@ import org.apache.accumulo.server.client
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -186,7 +187,7 @@ public class FileSystemImpl implements o
   }
 
   private void ensureSyncIsEnabled() {
-    for (FileSystem fs : getFileSystems()) {
+    for (FileSystem fs : getFileSystems().values()) {
       if (fs instanceof DistributedFileSystem) {
         if (!fs.getConf().getBoolean("dfs.durable.sync", false) && !fs.getConf().getBoolean("dfs.support.append", false)) {
           String msg = "Must set dfs.durable.sync OR dfs.support.append to true.  Which one needs to be set depends on your version of HDFS.  See ACCUMULO-623. \n"
@@ -230,7 +231,15 @@ public class FileSystemImpl implements o
 
   @Override
   public FileSystem getFileSystemByPath(Path path) {
-    log.info("Looking up namespace on " + path);
+    if (path.isAbsolute())
+    {
+      try {
+        return path.getFileSystem(CachedConfiguration.getInstance());
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+      
     return namespaces.get(defaultNamespace);
   }
 
@@ -240,8 +249,8 @@ public class FileSystemImpl implements o
   }
 
   @Override
-  public Collection<FileSystem> getFileSystems() {
-    return new ArrayList<FileSystem>(namespaces.values());
+  public Map<String, ? extends org.apache.hadoop.fs.FileSystem>  getFileSystems() {
+    return namespaces;
   }
 
   @Override
@@ -293,7 +302,7 @@ public class FileSystemImpl implements o
 
   @Override
   public boolean isReady() throws IOException {
-    for (FileSystem fs : getFileSystems()) {
+    for (FileSystem fs : getFileSystems().values()) {
       if (!(fs instanceof DistributedFileSystem))
         continue;
       DistributedFileSystem dfs = (DistributedFileSystem)fs;
@@ -341,11 +350,14 @@ public class FileSystemImpl implements o
   public FileStatus[] globStatus(Path pathPattern) throws IOException {
     return getFileSystemByPath(pathPattern).globStatus(pathPattern);
   }
-
+  
   @Override
   public String getFullPath(Key key) {
     
     String relPath = key.getColumnQualifierData().toString();
+    if (relPath.contains(":"))
+      return relPath;
+   
     byte [] tableId = KeyExtent.tableOfMetadataRow(key.getRow());
     
     if (relPath.startsWith("../"))
@@ -354,7 +366,52 @@ public class FileSystemImpl implements o
       relPath = "/" + new String(tableId) + relPath;
     String fullPath = Constants.getTablesDir(conf) + relPath;
     FileSystem ns = getFileSystemByPath(fullPath);
-    return ns.makeQualified(new Path(fullPath)).toString();
+    String result = ns.makeQualified(new Path(fullPath)).toString();
+    return result;
+  }
+
+  @Override
+  public Path matchingFileSystem(Path source, String[] options) {
+    for (String fs : getFileSystems().keySet()) {
+      for (String option : options) {
+        if (option.startsWith(fs))
+          return new Path(option);
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public String newPathOnSameNamespace(String sourceDir, String suffix) {
+    for (String fs : getFileSystems().keySet()) {
+        if (sourceDir.startsWith(fs)) {
+          return fs + "/" + suffix;
+        }
+    }
+    return null;
+  }
+
+  @Override
+  public String getFullPath(String[] paths, String fileName) throws IOException {
+    if (fileName.contains(":"))
+      return fileName;
+    // old-style name, on one of many possible "root" paths:
+    if (fileName.startsWith("../"))
+      fileName = fileName.substring(2);
+    for (String path : paths) {
+      String fullPath = path + fileName;
+      FileSystem ns = getFileSystemByPath(fullPath);
+      Path exists = new Path(fullPath);
+      if (ns.exists(exists))
+        return ns.makeQualified(exists).toString();
+    }
+    throw new RuntimeException("Could not find file " + fileName + " in " + Arrays.asList(paths));
+  }
+
+  @Override
+  public ContentSummary getContentSummary(String dir) {
+    // TODO Auto-generated method stub
+    return null;
   }
 
 }

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java Wed Jun  5 17:19:25 2013
@@ -358,21 +358,22 @@ public class SimpleGarbageCollector impl
       // if dir exist and is empty, then empty list is returned...
       // hadoop 1.0 will return null if the file doesn't exist
       // hadoop 2.0 will throw an exception if the file does not exist
-      FileStatus[] tabletDirs = null;
-      try {
-        tabletDirs = fs.listStatus(new Path(ServerConstants.getTablesDir() + "/" + delTableId));
-      } catch (FileNotFoundException ex) {
-        // ignored 
-      }
-      
-      if (tabletDirs == null)
-        continue;
-      
-      if (tabletDirs.length == 0) {
-        Path p = new Path(ServerConstants.getTablesDir() + "/" + delTableId);
-        if (!moveToTrash(p)) 
-          fs.delete(p);
-      }
+      for (String dir : ServerConstants.getTablesDirs()) {
+        FileStatus[] tabletDirs = null;
+        try {
+          tabletDirs = fs.listStatus(new Path(dir + "/" + delTableId));
+        } catch (FileNotFoundException ex) {
+          // ignored 
+        }
+        if (tabletDirs == null)
+          continue;
+        
+        if (tabletDirs.length == 0) {
+          Path p = new Path(dir + "/" + delTableId);
+          if (!moveToTrash(p)) 
+            fs.delete(p);
+        }
+      } 
     }
   }
   
@@ -430,10 +431,12 @@ public class SimpleGarbageCollector impl
       checkForBulkProcessingFiles = true;
       try {
         for (String validExtension : FileOperations.getValidExtensions()) {
-          for (FileStatus stat : fs.globStatus(new Path(ServerConstants.getTablesDir() + "/*/*/*." + validExtension))) {
-            String cand = stat.getPath().toUri().getPath();
-            if (!cand.contains(ServerConstants.getRootTabletDir())) {
-              candidates.add(cand.substring(ServerConstants.getTablesDir().length()));
+          for (String dir : ServerConstants.getTablesDirs()) {
+            for (FileStatus stat : fs.globStatus(new Path(dir + "/*/*/*." + validExtension))) {
+              String cand = stat.getPath().toUri().getPath();
+              if (cand.contains(ServerConstants.getRootTabletDir()))
+                continue;
+              candidates.add(cand.substring(dir.length()));
               log.debug("Offline candidate: " + cand);
             }
           }
@@ -658,9 +661,9 @@ public class SimpleGarbageCollector impl
         public void run() {
           boolean removeFlag;
           
-          String fullPath = ServerConstants.getTablesDir() + delete;
-          log.debug("Deleting " + fullPath);
           try {
+            String fullPath = fs.getFullPath(ServerConstants.getTablesDirs(), delete);
+            log.debug("Deleting " + fullPath);
             
             Path p = new Path(fullPath);
             

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/Master.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/Master.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/Master.java Wed Jun  5 17:19:25 2013
@@ -96,6 +96,7 @@ import org.apache.accumulo.fate.zookeepe
 import org.apache.accumulo.server.Accumulo;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.fs.FileSystem;
 import org.apache.accumulo.server.fs.FileSystemImpl;
 import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
@@ -1571,11 +1572,11 @@ public class Master implements LiveTServ
         Constants.METADATA_TIME_COLUMN.fetch(scanner);
         scanner.fetchColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY);
         scanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY);
-        Set<String> datafiles = new TreeSet<String>();
+        Set<FileRef> datafiles = new TreeSet<FileRef>();
         for (Entry<Key,Value> entry : scanner) {
           Key key = entry.getKey();
           if (key.compareColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY) == 0) {
-            datafiles.add(key.getColumnQualifier().toString());
+            datafiles.add(new FileRef(fs, key));
             if (datafiles.size() > 1000) {
               MetadataTable.addDeleteEntries(range, datafiles, SecurityConstants.getSystemCredentials());
               datafiles.clear();
@@ -1585,7 +1586,7 @@ public class Master implements LiveTServ
           } else if (key.compareColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY) == 0) {
             throw new IllegalStateException("Tablet " + key.getRow() + " is assigned during a merge!");
           } else if (Constants.METADATA_DIRECTORY_COLUMN.hasColumns(key)) {
-            datafiles.add(entry.getValue().toString());
+            datafiles.add(new FileRef(fs, key));
             if (datafiles.size() > 1000) {
               MetadataTable.addDeleteEntries(range, datafiles, SecurityConstants.getSystemCredentials());
               datafiles.clear();

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java Wed Jun  5 17:19:25 2013
@@ -35,8 +35,6 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 
-import org.apache.accumulo.trace.instrument.TraceExecutorService;
-import org.apache.accumulo.trace.instrument.Tracer;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
@@ -63,6 +61,7 @@ import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileSystem;
 import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
 import org.apache.accumulo.server.master.Master;
 import org.apache.accumulo.server.master.state.TServerInstance;
@@ -71,10 +70,11 @@ import org.apache.accumulo.server.tablet
 import org.apache.accumulo.server.util.MetadataTable;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
+import org.apache.accumulo.trace.instrument.TraceExecutorService;
+import org.apache.accumulo.trace.instrument.Tracer;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.accumulo.server.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.MapFile;
 import org.apache.hadoop.io.Text;
@@ -180,7 +180,22 @@ public class BulkImport extends MasterRe
   }
   
   private Path createNewBulkDir(FileSystem fs, String tableId) throws IOException {
-    Path directory = new Path(ServerConstants.getTablesDir() + "/" + tableId);
+    String tableDir = null;
+    loop:
+    for (String dir : fs.getFileSystems().keySet()) {
+      if (this.sourceDir.startsWith(dir)) {
+        for (String path : ServerConstants.getTablesDirs()) {
+          if (path.startsWith(dir)) {
+            tableDir = path;
+            break loop;
+          }
+        }
+        break;
+      }
+    }
+    if (tableDir == null)
+      throw new IllegalStateException(sourceDir + " is not in a known namespace");
+    Path directory = new Path(tableDir + "/" + tableId);
     fs.mkdirs(directory);
     
     // only one should be able to create the lock file

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java Wed Jun  5 17:19:25 2013
@@ -27,25 +27,21 @@ import org.apache.accumulo.core.client.i
 import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.file.FileUtil;
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.server.ServerConstants;
-import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileSystem;
 import org.apache.accumulo.server.master.Master;
 import org.apache.accumulo.server.master.state.tables.TableManager;
 import org.apache.accumulo.server.security.AuditedSecurityOperation;
 import org.apache.accumulo.server.security.SecurityConstants;
 import org.apache.accumulo.server.security.SecurityOperation;
 import org.apache.accumulo.server.tabletserver.TabletTime;
-import org.apache.accumulo.server.trace.TraceFileSystem;
 import org.apache.accumulo.server.util.MetadataTable;
 import org.apache.accumulo.server.util.TablePropUtil;
 import org.apache.accumulo.server.util.TabletOperations;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
@@ -148,18 +144,19 @@ class CreateDir extends MasterRepo {
   }
   
   @Override
-  public Repo<Master> call(long tid, Master environment) throws Exception {
-    FileSystem fs = TraceFileSystem.wrap(FileUtil.getFileSystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration()));
-    String dir = ServerConstants.getTablesDir() + "/" + tableInfo.tableId;
-    TabletOperations.createTabletDirectory(fs, dir, null);
+  public Repo<Master> call(long tid, Master master) throws Exception {
+    FileSystem fs = master.getFileSystem();
+    TabletOperations.createTabletDirectory(fs, tableInfo.tableId, null);
     return new PopulateMetadata(tableInfo);
   }
   
   @Override
-  public void undo(long tid, Master environment) throws Exception {
-    FileSystem fs = TraceFileSystem.wrap(FileUtil.getFileSystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration()));
-    String dir = ServerConstants.getTablesDir() + "/" + tableInfo.tableId;
-    fs.delete(new Path(dir), true);
+  public void undo(long tid, Master master) throws Exception {
+    FileSystem fs = master.getFileSystem();
+    for(String dir : ServerConstants.getTablesDirs()) {
+      fs.deleteRecursively(new Path(dir + "/" + tableInfo.tableId));
+    }
+    
   }
 }
 

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java Wed Jun  5 17:19:25 2013
@@ -34,9 +34,9 @@ import org.apache.accumulo.core.data.Ran
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.user.GrepIterator;
 import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.fs.FileSystem;
 import org.apache.accumulo.server.master.Master;
 import org.apache.accumulo.server.master.state.MetaDataTableScanner;
 import org.apache.accumulo.server.master.state.TabletLocationState;
@@ -46,7 +46,6 @@ import org.apache.accumulo.server.proble
 import org.apache.accumulo.server.security.AuditedSecurityOperation;
 import org.apache.accumulo.server.security.SecurityConstants;
 import org.apache.accumulo.server.util.MetadataTable;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
@@ -168,8 +167,10 @@ class CleanUp extends MasterRepo {
     if (refCount == 0) {
       // delete the map files
       try {
-        FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
-        fs.delete(new Path(ServerConstants.getTablesDir(), tableId), true);
+        FileSystem fs = master.getFileSystem();
+        for (String dir : ServerConstants.getTablesDirs()) {
+          fs.deleteRecursively(new Path(dir, tableId));
+        }
       } catch (IOException e) {
         log.error("Unable to remove deleted table directory", e);
       }

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java Wed Jun  5 17:19:25 2013
@@ -380,11 +380,12 @@ class CreateImportDir extends MasterRepo
   }
   
   @Override
-  public Repo<Master> call(long tid, Master environment) throws Exception {
+  public Repo<Master> call(long tid, Master master) throws Exception {
     
     UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
     
-    Path directory = new Path(ServerConstants.getTablesDir() + "/" + tableInfo.tableId);
+    Path base = master.getFileSystem().matchingFileSystem(new Path(tableInfo.exportDir), ServerConstants.getTablesDirs());
+    Path directory = new Path(base, tableInfo.tableId);
     
     Path newBulkDir = new Path(directory, Constants.BULK_PREFIX + namer.getNextName());
     

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java Wed Jun  5 17:19:25 2013
@@ -28,8 +28,6 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.accumulo.trace.instrument.Span;
-import org.apache.accumulo.trace.instrument.Trace;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
@@ -55,15 +53,17 @@ import org.apache.accumulo.core.util.Loc
 import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
 import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
 import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.FileSystem;
 import org.apache.accumulo.server.problems.ProblemReport;
 import org.apache.accumulo.server.problems.ProblemReportingIterator;
 import org.apache.accumulo.server.problems.ProblemReports;
 import org.apache.accumulo.server.problems.ProblemType;
 import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason;
 import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason;
+import org.apache.accumulo.trace.instrument.Span;
+import org.apache.accumulo.trace.instrument.Trace;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 
 
@@ -118,9 +118,9 @@ public class Compactor implements Callab
     IteratorScope getIteratorScope();
   }
   
-  private Map<String,DataFileValue> filesToCompact;
+  private Map<FileRef,DataFileValue> filesToCompact;
   private InMemoryMap imm;
-  private String outputFile;
+  private FileRef outputFile;
   private boolean propogateDeletes;
   private TableConfiguration acuTableConf;
   private CompactionEnv env;
@@ -217,9 +217,10 @@ public class Compactor implements Callab
         iiList.add(new IterInfo(iterSetting.getPriority(), iterSetting.getIteratorClass(), iterSetting.getName()));
         iterOptions.put(iterSetting.getName(), iterSetting.getOptions());
       }
-      
-      return new ActiveCompaction(compactor.extent.toThrift(), System.currentTimeMillis() - compactor.startTime, new ArrayList<String>(
-          compactor.filesToCompact.keySet()), compactor.outputFile, type, reason, localityGroup, entriesRead, entriesWritten, iiList, iterOptions);
+      List<String> filesToCompact = new ArrayList<String>();
+      for (FileRef ref : compactor.filesToCompact.keySet())
+        filesToCompact.add(ref.toString());
+      return new ActiveCompaction(compactor.extent.toThrift(), System.currentTimeMillis() - compactor.startTime, filesToCompact, compactor.outputFile.toString(), type, reason, localityGroup, entriesRead, entriesWritten, iiList, iterOptions);
     }
   }
   
@@ -235,7 +236,7 @@ public class Compactor implements Callab
     return compactions;
   }
 
-  Compactor(Configuration conf, FileSystem fs, Map<String,DataFileValue> files, InMemoryMap imm, String outputFile, boolean propogateDeletes,
+  Compactor(Configuration conf, FileSystem fs, Map<FileRef,DataFileValue> files, InMemoryMap imm, FileRef outputFile, boolean propogateDeletes,
       TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env, List<IteratorSetting> iterators, MajorCompactionReason reason) {
     this.extent = extent;
     this.conf = conf;
@@ -252,7 +253,7 @@ public class Compactor implements Callab
     startTime = System.currentTimeMillis();
   }
   
-  Compactor(Configuration conf, FileSystem fs, Map<String,DataFileValue> files, InMemoryMap imm, String outputFile, boolean propogateDeletes,
+  Compactor(Configuration conf, FileSystem fs, Map<FileRef,DataFileValue> files, InMemoryMap imm, FileRef outputFile, boolean propogateDeletes,
       TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env) {
     this(conf, fs, files, imm, outputFile, propogateDeletes, acuTableConf, extent, env, new ArrayList<IteratorSetting>(), null);
   }
@@ -266,7 +267,7 @@ public class Compactor implements Callab
   }
   
   String getOutputFile() {
-    return outputFile;
+    return outputFile.toString();
   }
   
   @Override
@@ -282,7 +283,8 @@ public class Compactor implements Callab
 
     try {
       FileOperations fileFactory = FileOperations.getInstance();
-      mfw = fileFactory.openWriter(outputFile, fs, conf, acuTableConf);
+      org.apache.hadoop.fs.FileSystem ns = this.fs.getFileSystemByPath(outputFile.path().toString());
+      mfw = fileFactory.openWriter(outputFile.path().toString(), ns, ns.getConf(), acuTableConf);
       
       Map<String,Set<ByteSequence>> lGroups;
       try {
@@ -314,7 +316,7 @@ public class Compactor implements Callab
       
       // Verify the file, since hadoop 0.20.2 sometimes lies about the success of close()
       try {
-        FileSKVIterator openReader = fileFactory.openReader(outputFile, false, fs, conf, acuTableConf);
+        FileSKVIterator openReader = fileFactory.openReader(outputFile.path().toString(), false, ns, ns.getConf(), acuTableConf);
         openReader.close();
       } catch (IOException ex) {
         log.error("Verification of successful compaction fails!!! " + extent + " " + outputFile, ex);
@@ -324,7 +326,7 @@ public class Compactor implements Callab
       log.debug(String.format("Compaction %s %,d read | %,d written | %,6d entries/sec | %6.3f secs", extent, majCStats.getEntriesRead(),
           majCStats.getEntriesWritten(), (int) (majCStats.getEntriesRead() / ((t2 - t1) / 1000.0)), (t2 - t1) / 1000.0));
       
-      majCStats.setFileSize(fileFactory.getFileSize(outputFile, fs, conf, acuTableConf));
+      majCStats.setFileSize(fileFactory.getFileSize(outputFile.path().toString(), ns, ns.getConf(), acuTableConf));
       return majCStats;
     } catch (IOException e) {
       log.error(e, e);
@@ -343,9 +345,8 @@ public class Compactor implements Callab
           try {
             mfw.close();
           } finally {
-            Path path = new Path(outputFile);
-            if (!fs.delete(path, true))
-              if (fs.exists(path))
+            if (!fs.deleteRecursively(outputFile.path()))
+              if (fs.exists(outputFile.path()))
                 log.error("Unable to delete " + outputFile);
           }
         }
@@ -359,18 +360,18 @@ public class Compactor implements Callab
     
     List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(filesToCompact.size());
     
-    for (String mapFile : filesToCompact.keySet()) {
+    for (FileRef mapFile : filesToCompact.keySet()) {
       try {
         
         FileOperations fileFactory = FileOperations.getInstance();
-        
+        org.apache.hadoop.fs.FileSystem fs = this.fs.getFileSystemByPath(mapFile.path().toString());
         FileSKVIterator reader;
         
-        reader = fileFactory.openReader(mapFile, false, fs, conf, acuTableConf);
+        reader = fileFactory.openReader(mapFile.path().toString(), false, fs, conf, acuTableConf);
         
         readers.add(reader);
         
-        SortedKeyValueIterator<Key,Value> iter = new ProblemReportingIterator(extent.getTableId().toString(), mapFile, false, reader);
+        SortedKeyValueIterator<Key,Value> iter = new ProblemReportingIterator(extent.getTableId().toString(), mapFile.path().toString(), false, reader);
         
         if (filesToCompact.get(mapFile).isTimeSet()) {
           iter = new TimeSettingIterator(iter, filesToCompact.get(mapFile).getTime());
@@ -380,7 +381,7 @@ public class Compactor implements Callab
         
       } catch (Throwable e) {
         
-        ProblemReports.getInstance().report(new ProblemReport(extent.getTableId().toString(), ProblemType.FILE_READ, mapFile, e));
+        ProblemReports.getInstance().report(new ProblemReport(extent.getTableId().toString(), ProblemType.FILE_READ, mapFile.path().toString(), e));
         
         log.warn("Some problem opening map file " + mapFile + " " + e.getMessage(), e);
         // failed to open some map file... close the ones that were opened
@@ -462,7 +463,7 @@ public class Compactor implements Callab
             } catch (IOException e) {
               log.error(e, e);
             }
-            fs.delete(new Path(outputFile), true);
+            fs.deleteRecursively(outputFile.path());
           } catch (Exception e) {
             log.warn("Failed to delete Canceled compaction output file " + outputFile, e);
           }

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java Wed Jun  5 17:19:25 2013
@@ -42,7 +42,9 @@ import org.apache.accumulo.core.iterator
 import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource;
 import org.apache.accumulo.core.iterators.system.TimeSettingIterator;
 import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
+import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.fs.FileSystem;
 import org.apache.accumulo.server.problems.ProblemReport;
 import org.apache.accumulo.server.problems.ProblemReportingIterator;
@@ -239,8 +241,7 @@ public class FileManager {
   }
   
   private List<String> takeOpenFiles(Collection<String> files, List<FileSKVIterator> reservedFiles, Map<FileSKVIterator,String> readersReserved) {
-    List<String> filesToOpen;
-    filesToOpen = new LinkedList<String>(files);
+    List<String> filesToOpen = new LinkedList<String>(files);
     for (Iterator<String> iterator = filesToOpen.iterator(); iterator.hasNext();) {
       String file = iterator.next();
       
@@ -305,7 +306,8 @@ public class FileManager {
     for (String file : filesToOpen) {
       try {
         // log.debug("Opening "+file);
-        org.apache.hadoop.fs.FileSystem ns = fs.getFileSystemByPath(file);
+        String path = fs.getFullPath(ServerConstants.getTablesDirs(), file);
+        org.apache.hadoop.fs.FileSystem ns = fs.getFileSystemByPath(path);
         FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, ns, ns.getConf(), conf.getTableConfiguration(table.toString()),
             dataCache, indexCache);
         reservedFiles.add(reader);
@@ -454,6 +456,13 @@ public class FileManager {
       }
     }
     
+    private List<FileSKVIterator> openFileRefs(Collection<FileRef> files) throws TooManyFilesException, IOException {
+      List<String> strings = new ArrayList<String>(files.size());
+      for (FileRef ref : files)
+        strings.add(ref.path().toString());
+      return openFiles(strings);
+    }
+    
     private List<FileSKVIterator> openFiles(Collection<String> files) throws TooManyFilesException, IOException {
       // one tablet can not open more than maxOpen files, otherwise it could get stuck
       // forever waiting on itself to release files
@@ -469,9 +478,9 @@ public class FileManager {
       return newlyReservedReaders;
     }
     
-    synchronized List<InterruptibleIterator> openFiles(Map<String,DataFileValue> files, boolean detachable) throws IOException {
+    synchronized List<InterruptibleIterator> openFiles(Map<FileRef,DataFileValue> files, boolean detachable) throws IOException {
       
-      List<FileSKVIterator> newlyReservedReaders = openFiles(files.keySet());
+      List<FileSKVIterator> newlyReservedReaders = openFileRefs(files.keySet());
       
       ArrayList<InterruptibleIterator> iters = new ArrayList<InterruptibleIterator>();
       
@@ -486,9 +495,9 @@ public class FileManager {
         } else {
           iter = new ProblemReportingIterator(tablet.getTableId().toString(), filename, continueOnFailure, reader);
         }
-        
-        if (files.get(filename).isTimeSet()) {
-          iter = new TimeSettingIterator(iter, files.get(filename).getTime());
+        DataFileValue value = files.get(new FileRef(filename));
+        if (value.isTimeSet()) {
+          iter = new TimeSettingIterator(iter, value.getTime());
         }
         
         iters.add(iter);

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java Wed Jun  5 17:19:25 2013
@@ -35,7 +35,8 @@ import org.apache.accumulo.server.proble
 import org.apache.accumulo.server.problems.ProblemType;
 import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 
@@ -43,16 +44,16 @@ public class MinorCompactor extends Comp
   
   private static final Logger log = Logger.getLogger(MinorCompactor.class);
   
-  private static final Map<String,DataFileValue> EMPTY_MAP = Collections.emptyMap();
+  private static final Map<FileRef,DataFileValue> EMPTY_MAP = Collections.emptyMap();
   
-  private static Map<String,DataFileValue> toFileMap(String mergeFile, DataFileValue dfv) {
+  private static Map<FileRef,DataFileValue> toFileMap(FileRef mergeFile, DataFileValue dfv) {
     if (mergeFile == null)
       return EMPTY_MAP;
     
     return Collections.singletonMap(mergeFile, dfv);
   }
   
-  MinorCompactor(Configuration conf, FileSystem fs, InMemoryMap imm, String mergeFile, DataFileValue dfv, String outputFile, TableConfiguration acuTableConf,
+  MinorCompactor(Configuration conf, FileSystem fs, InMemoryMap imm, FileRef mergeFile, DataFileValue dfv, FileRef outputFile, TableConfiguration acuTableConf,
       KeyExtent extent, MinorCompactionReason mincReason) {
     super(conf, fs, toFileMap(mergeFile, dfv), imm, outputFile, true, acuTableConf, extent, new CompactionEnv() {
       
@@ -127,7 +128,7 @@ public class MinorCompactor extends Comp
         // clean up
         try {
           if (getFileSystem().exists(new Path(getOutputFile()))) {
-            getFileSystem().delete(new Path(getOutputFile()), true);
+            getFileSystem().deleteRecursively(new Path(getOutputFile()));
           }
         } catch (IOException e) {
           log.warn("Failed to delete failed MinC file " + getOutputFile() + " " + e.getMessage());



Mime
View raw message