accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject svn commit: r1494759 [2/5] - in /accumulo/trunk: core/src/main/java/org/apache/accumulo/core/client/admin/ core/src/main/java/org/apache/accumulo/core/client/impl/ core/src/main/java/org/apache/accumulo/core/client/mock/ core/src/main/java/org/apache/a...
Date Wed, 19 Jun 2013 20:18:32 GMT
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java Wed Jun 19 20:18:30 2013
@@ -66,7 +66,6 @@ import org.apache.accumulo.core.data.Par
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.data.thrift.TKeyExtent;
-import org.apache.accumulo.core.file.FileUtil;
 import org.apache.accumulo.core.iterators.IteratorUtil;
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
@@ -84,7 +83,6 @@ import org.apache.accumulo.core.security
 import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
 import org.apache.accumulo.core.util.ByteBufferUtil;
-import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.RootTable;
 import org.apache.accumulo.core.util.UtilWaitThread;
@@ -101,6 +99,9 @@ import org.apache.accumulo.server.Accumu
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
 import org.apache.accumulo.server.master.balancer.DefaultLoadBalancer;
 import org.apache.accumulo.server.master.balancer.TabletBalancer;
@@ -144,7 +145,6 @@ import org.apache.accumulo.server.securi
 import org.apache.accumulo.server.security.SecurityConstants;
 import org.apache.accumulo.server.security.SecurityOperation;
 import org.apache.accumulo.server.tabletserver.TabletTime;
-import org.apache.accumulo.server.trace.TraceFileSystem;
 import org.apache.accumulo.server.util.AddressUtil;
 import org.apache.accumulo.server.util.DefaultMap;
 import org.apache.accumulo.server.util.Halt;
@@ -159,7 +159,6 @@ import org.apache.accumulo.server.zookee
 import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
 import org.apache.accumulo.trace.instrument.thrift.TraceWrap;
 import org.apache.accumulo.trace.thrift.TInfo;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
@@ -193,7 +192,7 @@ public class Master implements LiveTServ
   final private static int MAX_TSERVER_WORK_CHUNK = 5000;
   final private static int MAX_BAD_STATUS_COUNT = 3;
   
-  final private FileSystem fs;
+  final private VolumeManager fs;
   final private Instance instance;
   final private String hostname;
   final private LiveTServerSet tserverSet;
@@ -452,10 +451,10 @@ public class Master implements LiveTServ
     return instance;
   }
   
-  public Master(ServerConfiguration config, FileSystem fs, String hostname) throws IOException {
+  public Master(ServerConfiguration config, VolumeManager fs, String hostname) throws IOException {
     this.serverConfig = config;
     this.instance = config.getInstance();
-    this.fs = TraceFileSystem.wrap(fs);
+    this.fs = fs;
     this.hostname = hostname;
     
     AccumuloConfiguration aconf = serverConfig.getConfiguration();
@@ -1575,11 +1574,11 @@ public class Master implements LiveTServ
         MetadataTable.TIME_COLUMN.fetch(scanner);
         scanner.fetchColumnFamily(MetadataTable.DATAFILE_COLUMN_FAMILY);
         scanner.fetchColumnFamily(MetadataTable.CURRENT_LOCATION_COLUMN_FAMILY);
-        Set<String> datafiles = new TreeSet<String>();
+        Set<FileRef> datafiles = new TreeSet<FileRef>();
         for (Entry<Key,Value> entry : scanner) {
           Key key = entry.getKey();
           if (key.compareColumnFamily(MetadataTable.DATAFILE_COLUMN_FAMILY) == 0) {
-            datafiles.add(key.getColumnQualifier().toString());
+            datafiles.add(new FileRef(fs, key));
             if (datafiles.size() > 1000) {
               MetadataTable.addDeleteEntries(range, datafiles, SecurityConstants.getSystemCredentials());
               datafiles.clear();
@@ -1589,7 +1588,7 @@ public class Master implements LiveTServ
           } else if (key.compareColumnFamily(MetadataTable.CURRENT_LOCATION_COLUMN_FAMILY) == 0) {
             throw new IllegalStateException("Tablet " + key.getRow() + " is assigned during a merge!");
           } else if (MetadataTable.DIRECTORY_COLUMN.hasColumns(key)) {
-            datafiles.add(entry.getValue().toString());
+            datafiles.add(new FileRef(fs, key));
             if (datafiles.size() > 1000) {
               MetadataTable.addDeleteEntries(range, datafiles, SecurityConstants.getSystemCredentials());
               datafiles.clear();
@@ -2228,7 +2227,7 @@ public class Master implements LiveTServ
     try {
       SecurityUtil.serverLogin();
       
-      FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration());
+      VolumeManager fs = VolumeManagerImpl.get();
       String hostname = Accumulo.getLocalAddress(args);
       Instance instance = HdfsZooInstance.getInstance();
       ServerConfiguration conf = new ServerConfiguration(instance);
@@ -2380,7 +2379,7 @@ public class Master implements LiveTServ
     return serverConfig;
   }
   
-  public FileSystem getFileSystem() {
+  public VolumeManager getFileSystem() {
     return this.fs;
   }
   

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java Wed Jun 19 20:18:30 2013
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.server.master.Master;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
@@ -32,10 +33,10 @@ public class HadoopLogCloser implements 
   private static Logger log = Logger.getLogger(HadoopLogCloser.class);
 
   @Override
-  public long close(Master master, FileSystem fs, Path source) throws IOException {
-    
-    if (fs instanceof DistributedFileSystem) {
-      DistributedFileSystem dfs = (DistributedFileSystem) fs;
+  public long close(Master master, VolumeManager fs, Path source) throws IOException {
+    FileSystem ns = fs.getFileSystemByPath(source);
+    if (ns instanceof DistributedFileSystem) {
+      DistributedFileSystem dfs = (DistributedFileSystem) ns;
       try {
         if (!dfs.recoverLease(source)) {
           log.info("Waiting for file to be closed " + source.toString());
@@ -48,12 +49,12 @@ public class HadoopLogCloser implements 
       } catch (Exception ex) {
         log.warn("Error recovery lease on " + source.toString(), ex);
       }
-    } else if (fs instanceof LocalFileSystem) {
+    } else if (ns instanceof LocalFileSystem) {
       // ignore
     } else {
       throw new IllegalStateException("Don't know how to recover a lease for " + fs.getClass().getName());
     }
-    fs.append(source).close();
+    ns.append(source).close();
     log.info("Recovered lease on " + source.toString() + " using append");
     return 0;
   }

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java Wed Jun 19 20:18:30 2013
@@ -19,9 +19,9 @@ package org.apache.accumulo.server.maste
 import java.io.IOException;
 
 import org.apache.accumulo.server.master.Master;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.hadoop.fs.Path;
 
 public interface LogCloser {
-  public long close(Master master, FileSystem fs, Path path) throws IOException;
+  public long close(Master master, VolumeManager fs, Path path) throws IOException;
 }

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java Wed Jun 19 20:18:30 2013
@@ -19,6 +19,7 @@ package org.apache.accumulo.server.maste
 import java.io.IOException;
 
 import org.apache.accumulo.server.master.Master;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -29,11 +30,12 @@ public class MapRLogCloser implements Lo
   private static Logger log = Logger.getLogger(MapRLogCloser.class);
   
   @Override
-  public long close(Master m, FileSystem fs, Path path) throws IOException {
+  public long close(Master m, VolumeManager fs, Path path) throws IOException {
     log.info("Recovering file " + path.toString() + " by changing permission to readonly");
+    FileSystem ns = fs.getFileSystemByPath(path);
     FsPermission roPerm = new FsPermission((short) 0444);
     try {
-      fs.setPermission(path, roPerm);
+      ns.setPermission(path, roPerm);
       return 0;
     } catch (IOException ex) {
       log.error("error recovering lease ", ex);

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java Wed Jun 19 20:18:30 2013
@@ -36,10 +36,8 @@ import org.apache.accumulo.core.util.Nam
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.master.Master;
-import org.apache.accumulo.server.trace.TraceFileSystem;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.ZooCache;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;
@@ -68,40 +66,39 @@ public class RecoveryManager {
   }
   
   private class LogSortTask implements Runnable {
-    private String filename;
-    private String host;
+    private String source;
+    private String destination;
+    private String sortId;
     private LogCloser closer;
     
-    public LogSortTask(LogCloser closer, String host, String filename) {
+    public LogSortTask(LogCloser closer, String source, String destination, String sortId) {
       this.closer = closer;
-      this.host = host;
-      this.filename = filename;
+      this.source = source;
+      this.destination = destination;
+      this.sortId = sortId;
     }
     
     @Override
     public void run() {
       boolean rescheduled = false;
       try {
-        FileSystem localFs = master.getFileSystem();
-        if (localFs instanceof TraceFileSystem)
-          localFs = ((TraceFileSystem) localFs).getImplementation();
         
-        long time = closer.close(master, localFs, getSource(host, filename));
+        long time = closer.close(master, master.getFileSystem(), new Path(source));
         
         if (time > 0) {
           executor.schedule(this, time, TimeUnit.MILLISECONDS);
           rescheduled = true;
         } else {
-          initiateSort(host, filename);
+          initiateSort(sortId, source, destination);
         }
       } catch (FileNotFoundException e) {
-        log.debug("Unable to initate log sort for " + filename + ": " + e);
+        log.debug("Unable to initate log sort for " + source + ": " + e);
       } catch (Exception e) {
-        log.warn("Failed to initiate log sort " + filename, e);
+        log.warn("Failed to initiate log sort " + source, e);
       } finally {
         if (!rescheduled) {
           synchronized (RecoveryManager.this) {
-            closeTasksQueued.remove(filename);
+            closeTasksQueued.remove(sortId);
           }
         }
       }
@@ -109,61 +106,57 @@ public class RecoveryManager {
     
   }
   
-  private void initiateSort(String host, final String file) throws KeeperException, InterruptedException {
-    String source = getSource(host, file).toString();
-    new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY).addWork(file, source.getBytes());
+  private void initiateSort(String sortId, String source, final String destination) throws KeeperException, InterruptedException, IOException {
+    String work =  source + "|" + destination; 
+    new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY).addWork(sortId, work.getBytes());
     
     synchronized (this) {
-      sortsQueued.add(file);
+      sortsQueued.add(sortId);
     }
     
-    final String path = ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + file;
-    log.info("Created zookeeper entry " + path + " with data " + source);
-  }
-  
-  private Path getSource(String server, String file) {
-    String source = ServerConstants.getWalDirectory() + "/" + server + "/" + file;
-    if (server.contains(":")) {
-      // old-style logger log, copied from local file systems by tservers, unsorted into the wal base dir
-      source = ServerConstants.getWalDirectory() + "/" + file;
-    }
-    return new Path(source);
+    final String path = ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + sortId;
+    log.info("Created zookeeper entry " + path + " with data " + work);
   }
   
   public boolean recoverLogs(KeyExtent extent, Collection<Collection<String>> walogs) throws IOException {
     boolean recoveryNeeded = false;
+    ;
     for (Collection<String> logs : walogs) {
       for (String walog : logs) {
-        String parts[] = walog.split("/");
-        String host = parts[0];
-        String filename = parts[1];
+        String hostFilename[] = walog.split("/", 2);
+        String host = hostFilename[0];
+        String filename = hostFilename[1];
+        String parts[] = filename.split("/");
+        String sortId = parts[parts.length - 1];
+        String dest = master.getFileSystem().choose(ServerConstants.getRecoveryDirs()) + "/" + sortId;
+        log.debug("Recovering " + filename + " to " + dest);
         
         boolean sortQueued;
         synchronized (this) {
-          sortQueued = sortsQueued.contains(filename);
+          sortQueued = sortsQueued.contains(sortId);
         }
         
-        if (sortQueued && zooCache.get(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + filename) == null) {
+        if (sortQueued && zooCache.get(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + sortId) == null) {
           synchronized (this) {
-            sortsQueued.remove(filename);
+            sortsQueued.remove(sortId);
           }
         }
-        
-        if (master.getFileSystem().exists(new Path(ServerConstants.getRecoveryDir() + "/" + filename + "/finished"))) {
+
+        if (master.getFileSystem().exists(new Path(dest, "finished"))) {
           synchronized (this) {
-            closeTasksQueued.remove(filename);
-            recoveryDelay.remove(filename);
-            sortsQueued.remove(filename);
+            closeTasksQueued.remove(sortId);
+            recoveryDelay.remove(sortId);
+            sortsQueued.remove(sortId);
           }
           continue;
         }
         
         recoveryNeeded = true;
         synchronized (this) {
-          if (!closeTasksQueued.contains(filename) && !sortsQueued.contains(filename)) {
+          if (!closeTasksQueued.contains(sortId) && !sortsQueued.contains(sortId)) {
             AccumuloConfiguration aconf = master.getConfiguration().getConfiguration();
             LogCloser closer = Master.createInstanceFromPropertyName(aconf, Property.MASTER_WALOG_CLOSER_IMPLEMETATION, LogCloser.class, new HadoopLogCloser());
-            Long delay = recoveryDelay.get(filename);
+            Long delay = recoveryDelay.get(sortId);
             if (delay == null) {
               delay = master.getSystemConfiguration().getTimeInMillis(Property.MASTER_RECOVERY_DELAY);
             } else {
@@ -172,9 +165,9 @@ public class RecoveryManager {
             
             log.info("Starting recovery of " + filename + " (in : " + (delay / 1000) + "s) created for " + host + ", tablet " + extent + " holds a reference");
             
-            executor.schedule(new LogSortTask(closer, host, filename), delay, TimeUnit.MILLISECONDS);
-            closeTasksQueued.add(filename);
-            recoveryDelay.put(filename, delay);
+            executor.schedule(new LogSortTask(closer, filename, dest, sortId), delay, TimeUnit.MILLISECONDS);
+            closeTasksQueued.add(sortId);
+            recoveryDelay.put(sortId, delay);
           }
         }
       }

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/SetGoalState.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/SetGoalState.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/SetGoalState.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/SetGoalState.java Wed Jun 19 20:18:30 2013
@@ -17,17 +17,15 @@
 package org.apache.accumulo.server.master.state;
 
 import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.file.FileUtil;
 import org.apache.accumulo.core.master.thrift.MasterGoalState;
 import org.apache.accumulo.core.security.SecurityUtil;
-import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.server.Accumulo;
 import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.hadoop.fs.FileSystem;
 
 public class SetGoalState {
   
@@ -41,7 +39,7 @@ public class SetGoalState {
     }
     SecurityUtil.serverLogin();
 
-    FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration());
+    VolumeManager fs = VolumeManagerImpl.get();
     Accumulo.waitForZookeeperAndHdfs(fs);
     ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZMASTER_GOAL_STATE, args[0].getBytes(),
         NodeExistsPolicy.OVERWRITE);

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java Wed Jun 19 20:18:30 2013
@@ -62,6 +62,7 @@ import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
 import org.apache.accumulo.server.master.Master;
 import org.apache.accumulo.server.master.state.TServerInstance;
@@ -75,7 +76,6 @@ import org.apache.accumulo.trace.instrum
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.MapFile;
 import org.apache.hadoop.io.Text;
@@ -146,7 +146,7 @@ public class BulkImport extends MasterRe
     Utils.getReadLock(tableId, tid).lock();
     
     // check that the error directory exists and is empty
-    FileSystem fs = master.getFileSystem();
+    VolumeManager fs = master.getFileSystem();
     
     Path errorPath = new Path(errorDir);
     FileStatus errorStatus = null;
@@ -179,8 +179,23 @@ public class BulkImport extends MasterRe
     }
   }
   
-  private Path createNewBulkDir(FileSystem fs, String tableId) throws IOException {
-    Path directory = new Path(ServerConstants.getTablesDir() + "/" + tableId);
+  private Path createNewBulkDir(VolumeManager fs, String tableId) throws IOException {
+    String tableDir = null;
+    loop:
+    for (String dir : fs.getFileSystems().keySet()) {
+      if (this.sourceDir.startsWith(dir)) {
+        for (String path : ServerConstants.getTablesDirs()) {
+          if (path.startsWith(dir)) {
+            tableDir = path;
+            break loop;
+          }
+        }
+        break;
+      }
+    }
+    if (tableDir == null)
+      throw new IllegalStateException(sourceDir + " is not in a known namespace");
+    Path directory = new Path(tableDir + "/" + tableId);
     fs.mkdirs(directory);
     
     // only one should be able to create the lock file
@@ -203,7 +218,7 @@ public class BulkImport extends MasterRe
     }
   }
   
-  private String prepareBulkImport(FileSystem fs, String dir, String tableId) throws IOException {
+  private String prepareBulkImport(VolumeManager fs, String dir, String tableId) throws IOException {
     Path bulkDir = createNewBulkDir(fs, tableId);
     
     MetadataTable.addBulkLoadInProgressFlag("/" + bulkDir.getParent().getName() + "/" + bulkDir.getName());
@@ -369,7 +384,7 @@ class CopyFailed extends MasterRepo {
   public Repo<Master> call(long tid, Master master) throws Exception {
     // This needs to execute after the arbiter is stopped
     
-    FileSystem fs = master.getFileSystem();
+    VolumeManager fs = master.getFileSystem();
     
     if (!fs.exists(new Path(error, BulkImport.FAILURES_TXT)))
       return new CleanUpBulkImport(tableId, source, bulk, error);
@@ -440,7 +455,7 @@ class CopyFailed extends MasterRepo {
       bifCopyQueue.waitUntilDone(workIds);
     }
     
-    fs.delete(new Path(error, BulkImport.FAILURES_TXT), true);
+    fs.deleteRecursively(new Path(error, BulkImport.FAILURES_TXT));
     return new CleanUpBulkImport(tableId, source, bulk, error);
   }
   
@@ -490,7 +505,7 @@ class LoadFiles extends MasterRepo {
   public Repo<Master> call(final long tid, final Master master) throws Exception {
     initializeThreadPool(master);
     final SiteConfiguration conf = ServerConfiguration.getSiteConfiguration();
-    FileSystem fs = master.getFileSystem();
+    VolumeManager fs = master.getFileSystem();
     List<FileStatus> files = new ArrayList<FileStatus>();
     for (FileStatus entry : fs.listStatus(new Path(bulk))) {
       files.add(entry);
@@ -500,12 +515,12 @@ class LoadFiles extends MasterRepo {
     Path writable = new Path(this.errorDir, ".iswritable");
     if (!fs.createNewFile(writable)) {
       // Maybe this is a re-try... clear the flag and try again
-      fs.delete(writable, false);
+      fs.delete(writable);
       if (!fs.createNewFile(writable))
         throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY,
             "Unable to write to " + this.errorDir);
     }
-    fs.delete(writable, false);
+    fs.delete(writable);
     
     final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>());
     for (FileStatus f : files)

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java Wed Jun 19 20:18:30 2013
@@ -27,25 +27,21 @@ import org.apache.accumulo.core.client.i
 import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.file.FileUtil;
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.server.ServerConstants;
-import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.master.Master;
 import org.apache.accumulo.server.master.state.tables.TableManager;
 import org.apache.accumulo.server.security.AuditedSecurityOperation;
 import org.apache.accumulo.server.security.SecurityConstants;
 import org.apache.accumulo.server.security.SecurityOperation;
 import org.apache.accumulo.server.tabletserver.TabletTime;
-import org.apache.accumulo.server.trace.TraceFileSystem;
 import org.apache.accumulo.server.util.MetadataTable;
 import org.apache.accumulo.server.util.TablePropUtil;
 import org.apache.accumulo.server.util.TabletOperations;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
@@ -148,18 +144,19 @@ class CreateDir extends MasterRepo {
   }
   
   @Override
-  public Repo<Master> call(long tid, Master environment) throws Exception {
-    FileSystem fs = TraceFileSystem.wrap(FileUtil.getFileSystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration()));
-    String dir = ServerConstants.getTablesDir() + "/" + tableInfo.tableId;
-    TabletOperations.createTabletDirectory(fs, dir, null);
+  public Repo<Master> call(long tid, Master master) throws Exception {
+    VolumeManager fs = master.getFileSystem();
+    TabletOperations.createTabletDirectory(fs, tableInfo.tableId, null);
     return new PopulateMetadata(tableInfo);
   }
   
   @Override
-  public void undo(long tid, Master environment) throws Exception {
-    FileSystem fs = TraceFileSystem.wrap(FileUtil.getFileSystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration()));
-    String dir = ServerConstants.getTablesDir() + "/" + tableInfo.tableId;
-    fs.delete(new Path(dir), true);
+  public void undo(long tid, Master master) throws Exception {
+    VolumeManager fs = master.getFileSystem();
+    for(String dir : ServerConstants.getTablesDirs()) {
+      fs.deleteRecursively(new Path(dir + "/" + tableInfo.tableId));
+    }
+    
   }
 }
 

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java Wed Jun 19 20:18:30 2013
@@ -34,9 +34,9 @@ import org.apache.accumulo.core.data.Val
 import org.apache.accumulo.core.iterators.user.GrepIterator;
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.master.Master;
 import org.apache.accumulo.server.master.state.MetaDataTableScanner;
 import org.apache.accumulo.server.master.state.TabletLocationState;
@@ -46,7 +46,6 @@ import org.apache.accumulo.server.proble
 import org.apache.accumulo.server.security.AuditedSecurityOperation;
 import org.apache.accumulo.server.security.SecurityConstants;
 import org.apache.accumulo.server.util.MetadataTable;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
@@ -168,8 +167,10 @@ class CleanUp extends MasterRepo {
     if (refCount == 0) {
       // delete the map files
       try {
-        FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
-        fs.delete(new Path(ServerConstants.getTablesDir(), tableId), true);
+        VolumeManager fs = master.getFileSystem();
+        for (String dir : ServerConstants.getTablesDirs()) {
+          fs.deleteRecursively(new Path(dir, tableId));
+        }
       } catch (IOException e) {
         log.error("Unable to remove deleted table directory", e);
       }

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java Wed Jun 19 20:18:30 2013
@@ -22,7 +22,6 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.io.Serializable;
-import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -52,9 +51,9 @@ import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.master.Master;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 
@@ -141,7 +140,7 @@ class WriteExportFiles extends MasterRep
     Utils.unreserveTable(tableInfo.tableID, tid, false);
   }
   
-  public static void exportTable(FileSystem fs, Connector conn, String tableName, String tableID, String exportDir) throws Exception {
+  public static void exportTable(VolumeManager fs, Connector conn, String tableName, String tableID, String exportDir) throws Exception {
     
     fs.mkdirs(new Path(exportDir));
     
@@ -171,7 +170,7 @@ class WriteExportFiles extends MasterRep
       exportConfig(conn, tableID, zipOut, dataOut);
       dataOut.flush();
       
-      Map<String,String> uniqueFiles = exportMetadata(conn, tableID, zipOut, dataOut);
+      Map<String,String> uniqueFiles = exportMetadata(fs, conn, tableID, zipOut, dataOut);
       
       dataOut.close();
       dataOut = null;
@@ -184,24 +183,16 @@ class WriteExportFiles extends MasterRep
     }
   }
   
-  private static void createDistcpFile(FileSystem fs, String exportDir, Path exportMetaFilePath, Map<String,String> uniqueFiles) throws IOException {
+  private static void createDistcpFile(VolumeManager fs, String exportDir, Path exportMetaFilePath, Map<String,String> uniqueFiles) throws IOException {
     BufferedWriter distcpOut = new BufferedWriter(new OutputStreamWriter(fs.create(new Path(exportDir, "distcp.txt"), false)));
     
     try {
-      URI uri = fs.getUri();
-      
-      for (String relPath : uniqueFiles.values()) {
-        Path absPath = new Path(uri.getScheme(), uri.getAuthority(), ServerConstants.getTablesDir() + relPath);
-        distcpOut.append(absPath.toUri().toString());
+      for (String file : uniqueFiles.values()) {
+        distcpOut.append(file);
         distcpOut.newLine();
       }
       
-      Path absEMP = exportMetaFilePath;
-      if (!exportMetaFilePath.isAbsolute())
-        absEMP = new Path(fs.getWorkingDirectory().toUri().getPath(), exportMetaFilePath);
-      
-      distcpOut.append(new Path(uri.getScheme(), uri.getAuthority(), absEMP.toString()).toUri().toString());
-      
+      distcpOut.append(exportMetaFilePath.toString());
       distcpOut.newLine();
       
       distcpOut.close();
@@ -213,7 +204,7 @@ class WriteExportFiles extends MasterRep
     }
   }
   
-  private static Map<String,String> exportMetadata(Connector conn, String tableID, ZipOutputStream zipOut, DataOutputStream dataOut) throws IOException,
+  private static Map<String,String> exportMetadata(VolumeManager fs, Connector conn, String tableID, ZipOutputStream zipOut, DataOutputStream dataOut) throws IOException,
       TableNotFoundException {
     zipOut.putNextEntry(new ZipEntry(Constants.EXPORT_METADATA_FILE));
     
@@ -230,24 +221,18 @@ class WriteExportFiles extends MasterRep
       entry.getValue().write(dataOut);
       
       if (entry.getKey().getColumnFamily().equals(MetadataTable.DATAFILE_COLUMN_FAMILY)) {
-        String relPath = entry.getKey().getColumnQualifierData().toString();
-        
-        if (relPath.startsWith("../"))
-          relPath = relPath.substring(2);
-        else
-          relPath = "/" + tableID + relPath;
-        
-        String tokens[] = relPath.split("/");
-        if (tokens.length != 4) {
-          throw new RuntimeException("Illegal path " + relPath);
+        String path = fs.getFullPath(entry.getKey()).toString();
+        String tokens[] = path.split("/");
+        if (tokens.length < 1) {
+          throw new RuntimeException("Illegal path " + path);
         }
         
-        String filename = tokens[3];
+        String filename = tokens[tokens.length - 1];
         
         String existingPath = uniqueFiles.get(filename);
         if (existingPath == null) {
-          uniqueFiles.put(filename, relPath);
-        } else if (!existingPath.equals(relPath)) {
+          uniqueFiles.put(filename, path);
+        } else if (!existingPath.equals(path)) {
           // make sure file names are unique, should only apply for tables with file names generated by Accumulo 1.3 and earlier
           throw new IOException("Cannot export table with nonunique file names " + filename + ". Major compact table.");
         }

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java Wed Jun 19 20:18:30 2013
@@ -52,6 +52,7 @@ import org.apache.accumulo.fate.Repo;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.master.Master;
 import org.apache.accumulo.server.master.state.tables.TableManager;
 import org.apache.accumulo.server.security.AuditedSecurityOperation;
@@ -98,7 +99,7 @@ class FinishImportTable extends MasterRe
   @Override
   public Repo<Master> call(long tid, Master env) throws Exception {
     
-    env.getFileSystem().delete(new Path(tableInfo.importDir, "mappings.txt"), true);
+    env.getFileSystem().deleteRecursively(new Path(tableInfo.importDir, "mappings.txt"));
     
     TableManager.getInstance().transitionTableState(tableInfo.tableId, TableState.ONLINE);
     
@@ -136,7 +137,7 @@ class MoveExportedFiles extends MasterRe
   @Override
   public Repo<Master> call(long tid, Master master) throws Exception {
     try {
-      FileSystem fs = master.getFileSystem();
+      VolumeManager fs = master.getFileSystem();
       
       Map<String,String> fileNameMappings = PopulateMetadataTable.readMappingFile(fs, tableInfo);
       
@@ -175,7 +176,7 @@ class PopulateMetadataTable extends Mast
     this.tableInfo = ti;
   }
   
-  static Map<String,String> readMappingFile(FileSystem fs, ImportedTableInfo tableInfo) throws Exception {
+  static Map<String,String> readMappingFile(VolumeManager fs, ImportedTableInfo tableInfo) throws Exception {
     BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(tableInfo.importDir, "mappings.txt"))));
     
     try {
@@ -203,7 +204,7 @@ class PopulateMetadataTable extends Mast
     ZipInputStream zis = null;
     
     try {
-      FileSystem fs = master.getFileSystem();
+      VolumeManager fs = master.getFileSystem();
       
       mbw = master.getConnector().createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
       
@@ -311,10 +312,10 @@ class MapImportFileNames extends MasterR
     BufferedWriter mappingsWriter = null;
     
     try {
-      FileSystem fs = environment.getFileSystem();
+      VolumeManager fs = environment.getFileSystem();
       
       fs.mkdirs(new Path(tableInfo.importDir));
-      
+
       FileStatus[] files = fs.listStatus(new Path(tableInfo.exportDir));
       
       UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
@@ -323,7 +324,7 @@ class MapImportFileNames extends MasterR
       
       for (FileStatus fileStatus : files) {
         String fileName = fileStatus.getPath().getName();
-        
+        log.info("filename " + fileStatus.getPath().toString());
         String sa[] = fileName.split("\\.");
         String extension = "";
         if (sa.length > 1) {
@@ -365,7 +366,7 @@ class MapImportFileNames extends MasterR
   
   @Override
   public void undo(long tid, Master env) throws Exception {
-    env.getFileSystem().delete(new Path(tableInfo.importDir), true);
+    env.getFileSystem().deleteRecursively(new Path(tableInfo.importDir));
   }
 }
 
@@ -380,11 +381,12 @@ class CreateImportDir extends MasterRepo
   }
   
   @Override
-  public Repo<Master> call(long tid, Master environment) throws Exception {
+  public Repo<Master> call(long tid, Master master) throws Exception {
     
     UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
     
-    Path directory = new Path(ServerConstants.getTablesDir() + "/" + tableInfo.tableId);
+    Path base = master.getFileSystem().matchingFileSystem(new Path(tableInfo.exportDir), ServerConstants.getTablesDirs());
+    Path directory = new Path(base, tableInfo.tableId);
     
     Path newBulkDir = new Path(directory, Constants.BULK_PREFIX + namer.getNextName());
     
@@ -409,12 +411,13 @@ class ImportPopulateZookeeper extends Ma
     return Utils.reserveTable(tableInfo.tableId, tid, true, false, TableOperation.IMPORT);
   }
   
-  private Map<String,String> getExportedProps(FileSystem fs) throws Exception {
+  private Map<String,String> getExportedProps(VolumeManager fs) throws Exception {
     
     Path path = new Path(tableInfo.exportDir, Constants.EXPORT_FILE);
     
     try {
-      return TableOperationsImpl.getExportedProps(fs, path);
+      FileSystem ns = fs.getFileSystemByPath(path);
+      return TableOperationsImpl.getExportedProps(ns, path);
     } catch (IOException ioe) {
       throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER,
           "Error reading table props from " + path + " " + ioe.getMessage());

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java Wed Jun 19 20:18:30 2013
@@ -16,24 +16,28 @@
  */
 package org.apache.accumulo.server.master.tableOps;
 
-import org.apache.accumulo.core.Constants;
+import java.util.Map.Entry;
+
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.impl.thrift.TableOperation;
 import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.RootTable;
 import org.apache.accumulo.core.util.TextUtil;
 import org.apache.accumulo.fate.Repo;
-import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.master.Master;
 import org.apache.accumulo.server.master.state.MergeInfo;
 import org.apache.accumulo.server.master.state.MergeInfo.Operation;
 import org.apache.accumulo.server.master.state.MergeState;
 import org.apache.accumulo.server.util.MetadataTable;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 
 /**
@@ -58,13 +62,14 @@ class MakeDeleteEntries extends MasterRe
   public Repo<Master> call(long tid, Master master) throws Exception {
     log.info("creating delete entries for merged metadata tablets");
     Connector conn = master.getConnector();
+    Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    scanner.setRange(RootTable.KEYSPACE);
+    scanner.fetchColumnFamily(MetadataTable.DATAFILE_COLUMN_FAMILY);
     BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
-    String tableDir = ServerConstants.getMetadataTableDir();
-    for (FileStatus fs : master.getFileSystem().listStatus(new Path(tableDir))) {
+    for (Entry<Key,Value> entry : scanner) {
       // TODO: add the entries only if there are no !METADATA table references - ACCUMULO-1308
-      if (fs.isDir() && fs.getPath().getName().matches("^" + Constants.GENERATED_TABLET_DIRECTORY_PREFIX + ".*")) {
-        bw.addMutation(MetadataTable.createDeleteMutation(MetadataTable.ID, "/" + fs.getPath().getName()));
-      }
+      FileRef ref = new FileRef(master.getFileSystem(), entry.getKey());
+      bw.addMutation(MetadataTable.createDeleteMutation(MetadataTable.ID, ref.path().toString()));
     }
     bw.close();
     return null;

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java Wed Jun 19 20:18:30 2013
@@ -27,13 +27,11 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import org.apache.accumulo.trace.instrument.Tracer;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.impl.MasterClient;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.file.FileUtil;
 import org.apache.accumulo.core.gc.thrift.GCMonitorService;
 import org.apache.accumulo.core.gc.thrift.GCStatus;
 import org.apache.accumulo.core.master.thrift.Compacting;
@@ -42,7 +40,6 @@ import org.apache.accumulo.core.master.t
 import org.apache.accumulo.core.master.thrift.TableInfo;
 import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 import org.apache.accumulo.core.security.SecurityUtil;
-import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.LoggingRunnable;
 import org.apache.accumulo.core.util.Pair;
@@ -54,6 +51,8 @@ import org.apache.accumulo.core.zookeepe
 import org.apache.accumulo.server.Accumulo;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.monitor.servlets.DefaultServlet;
 import org.apache.accumulo.server.monitor.servlets.GcStatusServlet;
 import org.apache.accumulo.server.monitor.servlets.JSONServlet;
@@ -73,7 +72,7 @@ import org.apache.accumulo.server.proble
 import org.apache.accumulo.server.problems.ProblemType;
 import org.apache.accumulo.server.security.SecurityConstants;
 import org.apache.accumulo.server.util.EmbeddedWebServer;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.accumulo.trace.instrument.Tracer;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -450,7 +449,7 @@ public class Monitor {
   public static void main(String[] args) throws Exception {
     SecurityUtil.serverLogin();
     
-    FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration());
+    VolumeManager fs = VolumeManagerImpl.get();
     String hostname = Accumulo.getLocalAddress(args);
     instance = HdfsZooInstance.getInstance();
     config = new ServerConfiguration(instance);

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java Wed Jun 19 20:18:30 2013
@@ -28,8 +28,6 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.accumulo.trace.instrument.Span;
-import org.apache.accumulo.trace.instrument.Trace;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
@@ -55,15 +53,18 @@ import org.apache.accumulo.core.util.Loc
 import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
 import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
 import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.problems.ProblemReport;
 import org.apache.accumulo.server.problems.ProblemReportingIterator;
 import org.apache.accumulo.server.problems.ProblemReports;
 import org.apache.accumulo.server.problems.ProblemType;
 import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason;
 import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason;
+import org.apache.accumulo.trace.instrument.Span;
+import org.apache.accumulo.trace.instrument.Trace;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 
 
@@ -118,14 +119,14 @@ public class Compactor implements Callab
     IteratorScope getIteratorScope();
   }
   
-  private Map<String,DataFileValue> filesToCompact;
+  private Map<FileRef,DataFileValue> filesToCompact;
   private InMemoryMap imm;
-  private String outputFile;
+  private FileRef outputFile;
   private boolean propogateDeletes;
   private TableConfiguration acuTableConf;
   private CompactionEnv env;
   private Configuration conf;
-  private FileSystem fs;
+  private VolumeManager fs;
   protected KeyExtent extent;
   private List<IteratorSetting> iterators;
   
@@ -217,9 +218,10 @@ public class Compactor implements Callab
         iiList.add(new IterInfo(iterSetting.getPriority(), iterSetting.getIteratorClass(), iterSetting.getName()));
         iterOptions.put(iterSetting.getName(), iterSetting.getOptions());
       }
-      
-      return new ActiveCompaction(compactor.extent.toThrift(), System.currentTimeMillis() - compactor.startTime, new ArrayList<String>(
-          compactor.filesToCompact.keySet()), compactor.outputFile, type, reason, localityGroup, entriesRead, entriesWritten, iiList, iterOptions);
+      List<String> filesToCompact = new ArrayList<String>();
+      for (FileRef ref : compactor.filesToCompact.keySet())
+        filesToCompact.add(ref.toString());
+      return new ActiveCompaction(compactor.extent.toThrift(), System.currentTimeMillis() - compactor.startTime, filesToCompact, compactor.outputFile.toString(), type, reason, localityGroup, entriesRead, entriesWritten, iiList, iterOptions);
     }
   }
   
@@ -235,7 +237,7 @@ public class Compactor implements Callab
     return compactions;
   }
 
-  Compactor(Configuration conf, FileSystem fs, Map<String,DataFileValue> files, InMemoryMap imm, String outputFile, boolean propogateDeletes,
+  Compactor(Configuration conf, VolumeManager fs, Map<FileRef,DataFileValue> files, InMemoryMap imm, FileRef outputFile, boolean propogateDeletes,
       TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env, List<IteratorSetting> iterators, MajorCompactionReason reason) {
     this.extent = extent;
     this.conf = conf;
@@ -252,12 +254,12 @@ public class Compactor implements Callab
     startTime = System.currentTimeMillis();
   }
   
-  Compactor(Configuration conf, FileSystem fs, Map<String,DataFileValue> files, InMemoryMap imm, String outputFile, boolean propogateDeletes,
+  Compactor(Configuration conf, VolumeManager fs, Map<FileRef,DataFileValue> files, InMemoryMap imm, FileRef outputFile, boolean propogateDeletes,
       TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env) {
     this(conf, fs, files, imm, outputFile, propogateDeletes, acuTableConf, extent, env, new ArrayList<IteratorSetting>(), null);
   }
   
-  public FileSystem getFileSystem() {
+  public VolumeManager getFileSystem() {
     return fs;
   }
   
@@ -266,7 +268,7 @@ public class Compactor implements Callab
   }
   
   String getOutputFile() {
-    return outputFile;
+    return outputFile.toString();
   }
   
   @Override
@@ -282,7 +284,8 @@ public class Compactor implements Callab
 
     try {
       FileOperations fileFactory = FileOperations.getInstance();
-      mfw = fileFactory.openWriter(outputFile, fs, conf, acuTableConf);
+      FileSystem ns = this.fs.getFileSystemByPath(outputFile.path());
+      mfw = fileFactory.openWriter(outputFile.path().toString(), ns, ns.getConf(), acuTableConf);
       
       Map<String,Set<ByteSequence>> lGroups;
       try {
@@ -314,7 +317,7 @@ public class Compactor implements Callab
       
       // Verify the file, since hadoop 0.20.2 sometimes lies about the success of close()
       try {
-        FileSKVIterator openReader = fileFactory.openReader(outputFile, false, fs, conf, acuTableConf);
+        FileSKVIterator openReader = fileFactory.openReader(outputFile.path().toString(), false, ns, ns.getConf(), acuTableConf);
         openReader.close();
       } catch (IOException ex) {
         log.error("Verification of successful compaction fails!!! " + extent + " " + outputFile, ex);
@@ -324,7 +327,7 @@ public class Compactor implements Callab
       log.debug(String.format("Compaction %s %,d read | %,d written | %,6d entries/sec | %6.3f secs", extent, majCStats.getEntriesRead(),
           majCStats.getEntriesWritten(), (int) (majCStats.getEntriesRead() / ((t2 - t1) / 1000.0)), (t2 - t1) / 1000.0));
       
-      majCStats.setFileSize(fileFactory.getFileSize(outputFile, fs, conf, acuTableConf));
+      majCStats.setFileSize(fileFactory.getFileSize(outputFile.path().toString(), ns, ns.getConf(), acuTableConf));
       return majCStats;
     } catch (IOException e) {
       log.error(e, e);
@@ -343,9 +346,8 @@ public class Compactor implements Callab
           try {
             mfw.close();
           } finally {
-            Path path = new Path(outputFile);
-            if (!fs.delete(path, true))
-              if (fs.exists(path))
+            if (!fs.deleteRecursively(outputFile.path()))
+              if (fs.exists(outputFile.path()))
                 log.error("Unable to delete " + outputFile);
           }
         }
@@ -359,18 +361,18 @@ public class Compactor implements Callab
     
     List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(filesToCompact.size());
     
-    for (String mapFile : filesToCompact.keySet()) {
+    for (FileRef mapFile : filesToCompact.keySet()) {
       try {
         
         FileOperations fileFactory = FileOperations.getInstance();
-        
+        FileSystem fs = this.fs.getFileSystemByPath(mapFile.path());
         FileSKVIterator reader;
         
-        reader = fileFactory.openReader(mapFile, false, fs, conf, acuTableConf);
+        reader = fileFactory.openReader(mapFile.path().toString(), false, fs, conf, acuTableConf);
         
         readers.add(reader);
         
-        SortedKeyValueIterator<Key,Value> iter = new ProblemReportingIterator(extent.getTableId().toString(), mapFile, false, reader);
+        SortedKeyValueIterator<Key,Value> iter = new ProblemReportingIterator(extent.getTableId().toString(), mapFile.path().toString(), false, reader);
         
         if (filesToCompact.get(mapFile).isTimeSet()) {
           iter = new TimeSettingIterator(iter, filesToCompact.get(mapFile).getTime());
@@ -380,7 +382,7 @@ public class Compactor implements Callab
         
       } catch (Throwable e) {
         
-        ProblemReports.getInstance().report(new ProblemReport(extent.getTableId().toString(), ProblemType.FILE_READ, mapFile, e));
+        ProblemReports.getInstance().report(new ProblemReport(extent.getTableId().toString(), ProblemType.FILE_READ, mapFile.path().toString(), e));
         
         log.warn("Some problem opening map file " + mapFile + " " + e.getMessage(), e);
         // failed to open some map file... close the ones that were opened
@@ -462,7 +464,7 @@ public class Compactor implements Callab
             } catch (IOException e) {
               log.error(e, e);
             }
-            fs.delete(new Path(outputFile), true);
+            fs.deleteRecursively(outputFile.path());
           } catch (Exception e) {
             log.warn("Failed to delete Canceled compaction output file " + outputFile, e);
           }

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java Wed Jun 19 20:18:30 2013
@@ -42,13 +42,17 @@ import org.apache.accumulo.core.iterator
 import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource;
 import org.apache.accumulo.core.iterators.system.TimeSettingIterator;
 import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
+import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.problems.ProblemReport;
 import org.apache.accumulo.server.problems.ProblemReportingIterator;
 import org.apache.accumulo.server.problems.ProblemReports;
 import org.apache.accumulo.server.problems.ProblemType;
 import org.apache.accumulo.server.util.time.SimpleTimer;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 
@@ -99,7 +103,7 @@ public class FileManager {
   
   private Semaphore filePermits;
   
-  private FileSystem fs;
+  private VolumeManager fs;
   
   // the data cache and index cache are allocated in
   // TabletResourceManager and passed through the file opener to
@@ -158,7 +162,7 @@ public class FileManager {
    * @param indexCache
    *          : underlying file can and should be able to handle a null cache
    */
-  FileManager(ServerConfiguration conf, FileSystem fs, int maxOpen, BlockCache dataCache, BlockCache indexCache) {
+  FileManager(ServerConfiguration conf, VolumeManager fs, int maxOpen, BlockCache dataCache, BlockCache indexCache) {
     
     if (maxOpen <= 0)
       throw new IllegalArgumentException("maxOpen <= 0");
@@ -239,8 +243,7 @@ public class FileManager {
   }
   
   private List<String> takeOpenFiles(Collection<String> files, List<FileSKVIterator> reservedFiles, Map<FileSKVIterator,String> readersReserved) {
-    List<String> filesToOpen;
-    filesToOpen = new LinkedList<String>(files);
+    List<String> filesToOpen = new LinkedList<String>(files);
     for (Iterator<String> iterator = filesToOpen.iterator(); iterator.hasNext();) {
       String file = iterator.next();
       
@@ -304,8 +307,10 @@ public class FileManager {
     // open any files that need to be opened
     for (String file : filesToOpen) {
       try {
-        // log.debug("Opening "+file);
-        FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, fs, fs.getConf(), conf.getTableConfiguration(table.toString()),
+        Path path = fs.getFullPath(ServerConstants.getTablesDirs(), file);
+        FileSystem ns = fs.getFileSystemByPath(path);
+        //log.debug("Opening "+file + " path " + path);
+        FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), false, ns, ns.getConf(), conf.getTableConfiguration(table.toString()),
             dataCache, indexCache);
         reservedFiles.add(reader);
         readersReserved.put(reader, file);
@@ -453,6 +458,13 @@ public class FileManager {
       }
     }
     
+    private List<FileSKVIterator> openFileRefs(Collection<FileRef> files) throws TooManyFilesException, IOException {
+      List<String> strings = new ArrayList<String>(files.size());
+      for (FileRef ref : files)
+        strings.add(ref.path().toString());
+      return openFiles(strings);
+    }
+    
     private List<FileSKVIterator> openFiles(Collection<String> files) throws TooManyFilesException, IOException {
       // one tablet can not open more than maxOpen files, otherwise it could get stuck
       // forever waiting on itself to release files
@@ -468,9 +480,9 @@ public class FileManager {
       return newlyReservedReaders;
     }
     
-    synchronized List<InterruptibleIterator> openFiles(Map<String,DataFileValue> files, boolean detachable) throws IOException {
+    synchronized List<InterruptibleIterator> openFiles(Map<FileRef,DataFileValue> files, boolean detachable) throws IOException {
       
-      List<FileSKVIterator> newlyReservedReaders = openFiles(files.keySet());
+      List<FileSKVIterator> newlyReservedReaders = openFileRefs(files.keySet());
       
       ArrayList<InterruptibleIterator> iters = new ArrayList<InterruptibleIterator>();
       
@@ -485,9 +497,9 @@ public class FileManager {
         } else {
           iter = new ProblemReportingIterator(tablet.getTableId().toString(), filename, continueOnFailure, reader);
         }
-        
-        if (files.get(filename).isTimeSet()) {
-          iter = new TimeSettingIterator(iter, files.get(filename).getTime());
+        DataFileValue value = files.get(new FileRef(filename));
+        if (value.isTimeSet()) {
+          iter = new TimeSettingIterator(iter, value.getTime());
         }
         
         iters.add(iter);

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java Wed Jun 19 20:18:30 2013
@@ -34,7 +34,8 @@ import org.apache.accumulo.server.proble
 import org.apache.accumulo.server.problems.ProblemType;
 import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 
@@ -42,16 +43,16 @@ public class MinorCompactor extends Comp
   
   private static final Logger log = Logger.getLogger(MinorCompactor.class);
   
-  private static final Map<String,DataFileValue> EMPTY_MAP = Collections.emptyMap();
+  private static final Map<FileRef,DataFileValue> EMPTY_MAP = Collections.emptyMap();
   
-  private static Map<String,DataFileValue> toFileMap(String mergeFile, DataFileValue dfv) {
+  private static Map<FileRef,DataFileValue> toFileMap(FileRef mergeFile, DataFileValue dfv) {
     if (mergeFile == null)
       return EMPTY_MAP;
     
     return Collections.singletonMap(mergeFile, dfv);
   }
   
-  MinorCompactor(Configuration conf, FileSystem fs, InMemoryMap imm, String mergeFile, DataFileValue dfv, String outputFile, TableConfiguration acuTableConf,
+  MinorCompactor(Configuration conf, VolumeManager fs, InMemoryMap imm, FileRef mergeFile, DataFileValue dfv, FileRef outputFile, TableConfiguration acuTableConf,
       KeyExtent extent, MinorCompactionReason mincReason) {
     super(conf, fs, toFileMap(mergeFile, dfv), imm, outputFile, true, acuTableConf, extent, new CompactionEnv() {
       
@@ -126,7 +127,7 @@ public class MinorCompactor extends Comp
         // clean up
         try {
           if (getFileSystem().exists(new Path(getOutputFile()))) {
-            getFileSystem().delete(new Path(getOutputFile()), true);
+            getFileSystem().deleteRecursively(new Path(getOutputFile()));
           }
         } catch (IOException e) {
           log.warn("Failed to delete failed MinC file " + getOutputFile() + " " + e.getMessage());



Mime
View raw message